Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions kotlin-sdk/api/android/kotlin-sdk.api
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ public final class dev/openfeature/kotlin/sdk/OpenFeatureAPI {
public final fun getProvider ()Ldev/openfeature/kotlin/sdk/FeatureProvider;
public final fun getProviderMetadata ()Ldev/openfeature/kotlin/sdk/ProviderMetadata;
public final fun getProvidersFlow ()Lkotlinx/coroutines/flow/MutableStateFlow;
public final fun getSharedProviderEvents ()Lkotlinx/coroutines/flow/SharedFlow;
public final fun getStatus ()Ldev/openfeature/kotlin/sdk/OpenFeatureStatus;
public final fun getStatusFlow ()Lkotlinx/coroutines/flow/Flow;
public final fun setEvaluationContext (Ldev/openfeature/kotlin/sdk/EvaluationContext;Lkotlinx/coroutines/CoroutineDispatcher;)V
Expand Down
1 change: 1 addition & 0 deletions kotlin-sdk/api/jvm/kotlin-sdk.api
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ public final class dev/openfeature/kotlin/sdk/OpenFeatureAPI {
public final fun getProvider ()Ldev/openfeature/kotlin/sdk/FeatureProvider;
public final fun getProviderMetadata ()Ldev/openfeature/kotlin/sdk/ProviderMetadata;
public final fun getProvidersFlow ()Lkotlinx/coroutines/flow/MutableStateFlow;
public final fun getSharedProviderEvents ()Lkotlinx/coroutines/flow/SharedFlow;
public final fun getStatus ()Ldev/openfeature/kotlin/sdk/OpenFeatureStatus;
public final fun getStatusFlow ()Lkotlinx/coroutines/flow/Flow;
public final fun setEvaluationContext (Ldev/openfeature/kotlin/sdk/EvaluationContext;Lkotlinx/coroutines/CoroutineDispatcher;)V
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,39 @@ import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.filterIsInstance
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.shareIn
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlin.PublishedApi

@Suppress("TooManyFunctions")
object OpenFeatureAPI {
private var setProviderJob: Job? = null
private var setEvaluationContextJob: Job? = null
private var observeProviderEventsJob: Job? = null

private val providerMutex = Mutex()
private val NOOP_PROVIDER = NoOpProvider()
private var provider: FeatureProvider = NOOP_PROVIDER
private var context: EvaluationContext? = null
val providersFlow: MutableStateFlow<FeatureProvider> = MutableStateFlow(NOOP_PROVIDER)

/**
* [Dispatchers.Unconfined] keeps the shared [observe] collector on the emitting thread when possible.
*/
private val providerEventsScope = CoroutineScope(SupervisorJob() + Dispatchers.Unconfined)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using Dispatchers.Unconfined for a global event scope can lead to unpredictable behavior if subscribers perform heavy work or cause recursive emissions, as they will execute on the emitting thread. Consider using Dispatchers.Default to offload this work and ensure the provider remains responsive.

Suggested change
private val providerEventsScope = CoroutineScope(SupervisorJob() + Dispatchers.Unconfined)
private val providerEventsScope = CoroutineScope(SupervisorJob() + Dispatchers.Default)

Copy link
Copy Markdown
Contributor Author

@maxnrp maxnrp May 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tried with Default before (can check gemini's insights on this marked as resolved above), but tests looked messy, and I thought that Unconfined wasn't a bad option, may seen unstable but the jumping from thread to thread is kinda appealing, isn't?

Though as you suggested in a comment, now that API is becoming a class we could just pass Dispatcher as an argument eventually. I've updated the tests, just to keep the PR merge-ready, if #243 goes first, we may retouch it again.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not hardcode any dispatchers to make the code testable. In #243 this object will become a class, so we could take the opportunity to set the Dispatcher as a constructor argument.


@PublishedApi
@OptIn(ExperimentalCoroutinesApi::class)
internal val sharedProviderEvents =
providersFlow
.flatMapLatest { it.observe() }
.shareIn(providerEventsScope, SharingStarted.Eagerly, replay = 0)
Comment thread
maxnrp marked this conversation as resolved.

private val _statusFlow: MutableSharedFlow<OpenFeatureStatus> =
MutableSharedFlow<OpenFeatureStatus>(replay = 1, extraBufferCapacity = 5)
.apply {
Expand All @@ -44,6 +58,35 @@ object OpenFeatureAPI {
var hooks: List<Hook<*>> = listOf()
private set

/**
* Aligning the state management to
* https://openfeature.dev/specification/sections/events#requirement-535
*/
private val handleProviderEvents: FlowCollector<OpenFeatureProviderEvents> = FlowCollector { providerEvent ->
when (providerEvent) {
is OpenFeatureProviderEvents.ProviderReady -> {
_statusFlow.emit(OpenFeatureStatus.Ready)
}

is OpenFeatureProviderEvents.ProviderStale -> {
_statusFlow.emit(OpenFeatureStatus.Stale)
}

is OpenFeatureProviderEvents.ProviderError -> {
_statusFlow.emit(providerEvent.toOpenFeatureStatusError())
}

else -> { // All other states should not be emitted from here
}
}
}

init {
providerEventsScope.launch {
Comment thread
maxnrp marked this conversation as resolved.
Outdated
sharedProviderEvents.collect(handleProviderEvents)
}
}

/**
* Set the [FeatureProvider] for the SDK. This method will return immediately and initialize the provider in a coroutine scope
* When the provider is successfully initialized it will set the status to Ready.
Expand All @@ -63,7 +106,7 @@ object OpenFeatureAPI {
) {
setProviderJob?.cancel(CancellationException("Provider set job was cancelled due to new provider"))
this.setProviderJob = CoroutineScope(SupervisorJob() + dispatcher).launch {
setProviderInternal(provider, dispatcher, initialContext)
setProviderInternal(provider, initialContext)
}
}

Expand All @@ -72,26 +115,19 @@ object OpenFeatureAPI {
*
* @param provider the [FeatureProvider] to set
* @param initialContext the initial [EvaluationContext] to use for the provider initialization. Defaults to an null context if not set.
* @param dispatcher retained for API compatibility (no longer used for a separate provider observe job).
Comment thread
maxnrp marked this conversation as resolved.
Outdated
*/
@Suppress("UNUSED_PARAMETER")
suspend fun setProviderAndWait(
provider: FeatureProvider,
initialContext: EvaluationContext? = null,
dispatcher: CoroutineDispatcher = Dispatchers.Default
) {
setProviderInternal(provider, dispatcher, initialContext)
}

private fun listenToProviderEvents(provider: FeatureProvider, dispatcher: CoroutineDispatcher) {
observeProviderEventsJob?.cancel(CancellationException("Provider job was cancelled due to new provider"))
this.observeProviderEventsJob = CoroutineScope(SupervisorJob() + dispatcher).launch {
provider.observe().collect(handleProviderEvents)
}
setProviderInternal(provider, initialContext)
}

@OptIn(ExperimentalCoroutinesApi::class)
private suspend fun setProviderInternal(
provider: FeatureProvider,
dispatcher: CoroutineDispatcher,
initialContext: EvaluationContext? = null
) {
// Atomically swap the old and new provider to prevent race conditions
Expand All @@ -113,7 +149,6 @@ object OpenFeatureAPI {

// Initialize the new provider
tryWithStatusEmitErrorHandling {
listenToProviderEvents(provider, dispatcher)
getProvider().initialize(context)
_statusFlow.emit(OpenFeatureStatus.Ready)
}
Expand Down Expand Up @@ -249,9 +284,6 @@ object OpenFeatureAPI {
clearHooks()
setEvaluationContextJob?.cancel(CancellationException("Set context job was cancelled due to shutdown"))
setProviderJob?.cancel(CancellationException("Provider set job was cancelled due to shutdown"))
observeProviderEventsJob?.cancel(
CancellationException("Provider event observe job was cancelled due to shutdown")
)
clearProvider()
}

Expand All @@ -263,30 +295,6 @@ object OpenFeatureAPI {
/**
* Observe events of type [T] from the currently configured [FeatureProvider].
*/
@OptIn(ExperimentalCoroutinesApi::class)
inline fun <reified T : OpenFeatureProviderEvents> observe(): Flow<T> = providersFlow
.flatMapLatest { it.observe() }.filterIsInstance()

/**
* Aligning the state management to
* https://openfeature.dev/specification/sections/events#requirement-535
*/
private val handleProviderEvents: FlowCollector<OpenFeatureProviderEvents> = FlowCollector { providerEvent ->
when (providerEvent) {
is OpenFeatureProviderEvents.ProviderReady -> {
_statusFlow.emit(OpenFeatureStatus.Ready)
}

is OpenFeatureProviderEvents.ProviderStale -> {
_statusFlow.emit(OpenFeatureStatus.Stale)
}

is OpenFeatureProviderEvents.ProviderError -> {
_statusFlow.emit(providerEvent.toOpenFeatureStatusError())
}

else -> { // All other states should not be emitted from here
}
}
}
inline fun <reified T : OpenFeatureProviderEvents> observe(): Flow<T> =
sharedProviderEvents.filterIsInstance()
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import dev.openfeature.kotlin.sdk.events.OpenFeatureProviderEvents
import dev.openfeature.kotlin.sdk.exceptions.ErrorCode
import dev.openfeature.kotlin.sdk.exceptions.OpenFeatureError
import dev.openfeature.kotlin.sdk.exceptions.OpenFeatureError.GeneralError
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.Flow

private val typeMatchingException =
Expand All @@ -30,7 +29,6 @@ class OpenFeatureClient(

override val statusFlow = openFeatureAPI.statusFlow

@OptIn(ExperimentalCoroutinesApi::class)
override fun observe(): Flow<OpenFeatureProviderEvents> =
openFeatureAPI.observe<OpenFeatureProviderEvents>()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,8 +340,10 @@ class DeveloperExperienceTests {
provider,
initialContext = ImmutableContext("first")
)
testScheduler.advanceUntilIdle()
// emits ProviderStale + ProviderStale + ProviderStale
OpenFeatureAPI.getClient().track("hello-world")
testScheduler.advanceUntilIdle()

// emits ProviderStale + ProviderConfigurationChanged
OpenFeatureAPI.setEvaluationContextAndWait(ImmutableContext("second"))
Expand Down
Loading