feat: Centralize event distribution#246
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a shared flow for provider events in the OpenFeatureAPI, utilizing the shareIn operator to centralize event observation. While this simplifies the public API and reduces overhead for multiple observers, the implementation hardcodes Dispatchers.Default, which has introduced the need for real-time delays in the test suite to ensure deterministic behavior. Feedback suggests making the dispatcher configurable to improve test reliability and further unifying internal status tracking with this shared flow to eliminate redundant subscriptions to the underlying provider events.
56cf79b to
e61ebd7
Compare
Signed-off-by: Max Pinheiro <max.pinheiro@fluxon.com>
e61ebd7 to
59db46c
Compare
Signed-off-by: Max Pinheiro <max.pinheiro@fluxon.com>
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request refactors event management in OpenFeatureAPI by introducing a shared flow to handle provider events globally, replacing the previous per-provider observation logic. Key changes include the addition of sharedProviderEvents and the use of a dedicated CoroutineScope for event collection. Feedback identifies that using Dispatchers.Unconfined for the event scope may lead to unpredictable behavior and suggests switching to Dispatchers.Default. Additionally, there is a noted discrepancy between the implementation and the PR description regarding the sharing strategy, which may cause late subscribers to miss events.
| /** | ||
| * [Dispatchers.Unconfined] keeps the shared [observe] collector on the emitting thread when possible. | ||
| */ | ||
| private val providerEventsScope = CoroutineScope(SupervisorJob() + Dispatchers.Unconfined) |
There was a problem hiding this comment.
Using Dispatchers.Unconfined for a global event scope can lead to unpredictable behavior if subscribers perform heavy work or cause recursive emissions, as they will execute on the emitting thread. Consider using Dispatchers.Default to offload this work and ensure the provider remains responsive.
| private val providerEventsScope = CoroutineScope(SupervisorJob() + Dispatchers.Unconfined) | |
| private val providerEventsScope = CoroutineScope(SupervisorJob() + Dispatchers.Default) |
There was a problem hiding this comment.
I've tried with Default before (can check gemini's insights on this marked as resolved above), but tests looked messy, and I thought that Unconfined wasn't a bad option, may seen unstable but the jumping from thread to thread is kinda appealing, isn't?
Though as you suggested in a comment, now that API is becoming a class we could just pass Dispatcher as an argument eventually. I've updated the tests, just to keep the PR merge-ready, if #243 goes first, we may retouch it again.
bencehornak
left a comment
There was a problem hiding this comment.
First batch of thoughts :)
| /** | ||
| * [Dispatchers.Unconfined] keeps the shared [observe] collector on the emitting thread when possible. | ||
| */ | ||
| private val providerEventsScope = CoroutineScope(SupervisorJob() + Dispatchers.Unconfined) |
There was a problem hiding this comment.
We should not hardcode any dispatchers to make the code testable. In #243 this object will become a class, so we could take the opportunity to set the Dispatcher as a constructor argument.
Signed-off-by: Max Pinheiro <max.pinheiro@fluxon.com>
Signed-off-by: Max Pinheiro <max.pinheiro@fluxon.com>
Signed-off-by: Max Pinheiro <max.pinheiro@fluxon.com>
69015ea to
1dc461b
Compare
Intent
Fix redundant
FeatureProvider.observe()subscriptions when multiple callers collectOpenFeatureAPI.observe()/Client.observe()(#245).Motivation
Each collector used to drive its own
flatMapLatestintoprovider.observe()— which can be an overhead for providers whereobserve()is costly.Changes
shareInonprovidersFlow.flatMapLatest { it.observe() }.shareIn(providerEventsScope, SharingStarted.Eagerly, replay = 0).Tests
flushDispatchersDefault(durationMs):runTestdoesn’t pumpDispatchers.Default, whereshareInruns.Existing event-order / client–API parity / filter tests kept; they still define expected behavior.
Breaking changes
None intended. Concurrent collectors now share one hot stream (replay = 0);