Skip to content

Commit a63d85e

Browse files
committed
fix(core): harden ProviderRepository concurrency (#185)
harden ProviderRepository concurrency, lock inversions, and backpressure testing Signed-off-by: Marcin Stepien <marcin.stepien@fluxon.com>
1 parent 680810b commit a63d85e

5 files changed

Lines changed: 220 additions & 32 deletions

File tree

kotlin-sdk/api/android/kotlin-sdk.api

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public final class dev/openfeature/kotlin/sdk/DomainState {
3939
public final fun getStatus ()Ldev/openfeature/kotlin/sdk/OpenFeatureStatus;
4040
public final fun getStatusFlow ()Lkotlinx/coroutines/flow/Flow;
4141
public final fun get_statusFlow ()Lkotlinx/coroutines/flow/MutableSharedFlow;
42-
public final fun initializeListener (Lkotlinx/coroutines/CoroutineDispatcher;)V
42+
public final fun initializeListener (Lkotlinx/coroutines/CoroutineDispatcher;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
4343
public final fun setContext (Ldev/openfeature/kotlin/sdk/EvaluationContext;)V
4444
public final fun setSetEvaluationContextJob (Lkotlinx/coroutines/Job;)V
4545
public final fun setSetProviderJob (Lkotlinx/coroutines/Job;)V

kotlin-sdk/api/jvm/kotlin-sdk.api

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public final class dev/openfeature/kotlin/sdk/DomainState {
3939
public final fun getStatus ()Ldev/openfeature/kotlin/sdk/OpenFeatureStatus;
4040
public final fun getStatusFlow ()Lkotlinx/coroutines/flow/Flow;
4141
public final fun get_statusFlow ()Lkotlinx/coroutines/flow/MutableSharedFlow;
42-
public final fun initializeListener (Lkotlinx/coroutines/CoroutineDispatcher;)V
42+
public final fun initializeListener (Lkotlinx/coroutines/CoroutineDispatcher;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
4343
public final fun setContext (Ldev/openfeature/kotlin/sdk/EvaluationContext;)V
4444
public final fun setSetEvaluationContextJob (Lkotlinx/coroutines/Job;)V
4545
public final fun setSetProviderJob (Lkotlinx/coroutines/Job;)V

kotlin-sdk/src/commonMain/kotlin/dev/openfeature/kotlin/sdk/ProviderRepository.kt

Lines changed: 71 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,29 @@ package dev.openfeature.kotlin.sdk
22

33
import dev.openfeature.kotlin.sdk.events.OpenFeatureProviderEvents
44
import dev.openfeature.kotlin.sdk.events.toOpenFeatureStatusError
5+
import dev.openfeature.kotlin.sdk.exceptions.ErrorCode
56
import kotlinx.coroutines.CancellationException
67
import kotlinx.coroutines.CoroutineDispatcher
78
import kotlinx.coroutines.CoroutineScope
89
import kotlinx.coroutines.Job
910
import kotlinx.coroutines.SupervisorJob
1011
import kotlinx.coroutines.cancel
12+
import kotlinx.coroutines.channels.BufferOverflow
13+
import kotlinx.coroutines.delay
1114
import kotlinx.coroutines.flow.Flow
1215
import kotlinx.coroutines.flow.MutableSharedFlow
1316
import kotlinx.coroutines.flow.MutableStateFlow
14-
import kotlinx.coroutines.flow.collectLatest
1517
import kotlinx.coroutines.flow.distinctUntilChanged
16-
import kotlinx.coroutines.flow.firstOrNull
18+
import kotlinx.coroutines.flow.flatMapLatest
19+
import kotlinx.coroutines.flow.launchIn
1720
import kotlinx.coroutines.flow.map
21+
import kotlinx.coroutines.flow.onEach
22+
import kotlinx.coroutines.flow.retryWhen
1823
import kotlinx.coroutines.flow.update
19-
import kotlinx.coroutines.launch
2024
import kotlinx.coroutines.sync.Mutex
2125
import kotlinx.coroutines.sync.withLock
26+
import kotlinx.coroutines.withContext
27+
import kotlin.concurrent.Volatile
2228

2329
class DomainState {
2430
var setProviderJob: Job? = null
@@ -30,35 +36,64 @@ class DomainState {
3036
var context: EvaluationContext? = null
3137

3238
val _statusFlow: MutableSharedFlow<OpenFeatureStatus> =
33-
MutableSharedFlow<OpenFeatureStatus>(replay = 1, extraBufferCapacity = 5).apply {
39+
MutableSharedFlow<OpenFeatureStatus>(
40+
replay = 1,
41+
extraBufferCapacity = 5,
42+
onBufferOverflow = BufferOverflow.DROP_OLDEST
43+
).apply {
3444
tryEmit(OpenFeatureStatus.NotReady)
3545
}
3646

3747
val statusFlow: Flow<OpenFeatureStatus> get() = _statusFlow.distinctUntilChanged()
3848

3949
private var domainScope: CoroutineScope? = null
4050

41-
fun initializeListener(dispatcher: CoroutineDispatcher) {
42-
if (domainScope != null) return
43-
domainScope = CoroutineScope(SupervisorJob() + dispatcher).apply {
44-
launch {
45-
providersFlow.collectLatest { currentProvider ->
46-
currentProvider.observe().collect { providerEvent ->
47-
when (providerEvent) {
48-
is OpenFeatureProviderEvents.ProviderReady -> {
49-
emitStatus(OpenFeatureStatus.Ready)
50-
}
51-
is OpenFeatureProviderEvents.ProviderStale -> {
52-
emitStatus(OpenFeatureStatus.Stale)
53-
}
54-
is OpenFeatureProviderEvents.ProviderError -> {
55-
emitStatus(providerEvent.toOpenFeatureStatusError())
56-
}
57-
else -> { // All other states should not be emitted from here
58-
}
51+
@Volatile
52+
private var currentDispatcher: CoroutineDispatcher? = null
53+
54+
suspend fun initializeListener(dispatcher: CoroutineDispatcher) {
55+
providerMutex.withLock {
56+
currentDispatcher = dispatcher
57+
58+
if (domainScope != null) return@withLock
59+
val scope = CoroutineScope(SupervisorJob() + dispatcher)
60+
domainScope = scope
61+
62+
providersFlow
63+
.flatMapLatest { currentProvider ->
64+
currentProvider.observe()
65+
.retryWhen { cause, _ ->
66+
emit(
67+
OpenFeatureProviderEvents.ProviderError(
68+
OpenFeatureProviderEvents.EventDetails(
69+
message = cause.message ?: "Provider observe() crashed",
70+
errorCode = ErrorCode.GENERAL
71+
)
72+
)
73+
)
74+
delay(3000L)
75+
true
76+
}
77+
.map { event -> currentProvider to event }
78+
}
79+
.onEach { (currentProvider, providerEvent) ->
80+
val activeDispatcher = currentDispatcher ?: dispatcher
81+
withContext(activeDispatcher) {
82+
if (providersFlow.value === currentProvider) {
83+
processProviderEvent(providerEvent)
5984
}
6085
}
6186
}
87+
.launchIn(scope)
88+
}
89+
}
90+
91+
private suspend fun processProviderEvent(event: OpenFeatureProviderEvents) {
92+
when (event) {
93+
is OpenFeatureProviderEvents.ProviderReady -> emitStatus(OpenFeatureStatus.Ready)
94+
is OpenFeatureProviderEvents.ProviderStale -> emitStatus(OpenFeatureStatus.Stale)
95+
is OpenFeatureProviderEvents.ProviderError -> emitStatus(event.toOpenFeatureStatusError())
96+
else -> { // All other states should not be emitted from here
6297
}
6398
}
6499
}
@@ -72,11 +107,14 @@ class DomainState {
72107
suspend fun shutdown() {
73108
setProviderJob?.cancel(CancellationException("Provider set job was cancelled due to shutdown"))
74109
setEvaluationContextJob?.cancel(CancellationException("Set context job was cancelled due to shutdown"))
75-
domainScope?.cancel(CancellationException("DomainScope was cancelled due to shutdown"))
76-
domainScope = null
110+
providerMutex.withLock {
111+
domainScope?.cancel(CancellationException("DomainScope was cancelled due to shutdown"))
112+
domainScope = null
113+
currentDispatcher = null
114+
}
77115
provider.shutdown()
78116
providersFlow.value = NoOpProvider()
79-
_statusFlow.emit(OpenFeatureStatus.NotReady)
117+
_statusFlow.tryEmit(OpenFeatureStatus.NotReady)
80118
}
81119
}
82120

@@ -110,12 +148,16 @@ class ProviderRepository {
110148
}
111149

112150
suspend fun clearAll() {
113-
repositoryMutex.withLock {
151+
// Evaluate and detach existing states safely
152+
val allStatesToShutdown = repositoryMutex.withLock {
114153
val all = getAllStates()
115-
all.forEach { state ->
116-
state.shutdown()
117-
}
118154
domainsFlow.value = emptyMap()
155+
all
156+
}
157+
158+
// Shutdown cleanly outside the repository lock to prevent lock-inversion deadlocks!
159+
allStatesToShutdown.forEach { state ->
160+
state.shutdown()
119161
}
120162
}
121163
}

kotlin-sdk/src/commonTest/kotlin/dev/openfeature/kotlin/sdk/DomainE2ETest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,11 @@ import dev.openfeature.kotlin.sdk.helpers.BrokenInitProvider
55
import dev.openfeature.kotlin.sdk.helpers.DoSomethingProvider
66
import kotlinx.coroutines.ExperimentalCoroutinesApi
77
import kotlinx.coroutines.cancelAndJoin
8+
import kotlinx.coroutines.delay
89
import kotlinx.coroutines.flow.MutableSharedFlow
910
import kotlinx.coroutines.launch
1011
import kotlinx.coroutines.test.currentTime
1112
import kotlinx.coroutines.test.runTest
12-
import kotlinx.coroutines.delay
1313
import kotlin.test.AfterTest
1414
import kotlin.test.Test
1515
import kotlin.test.assertEquals

kotlin-sdk/src/commonTest/kotlin/dev/openfeature/kotlin/sdk/ProviderRepositoryTest.kt

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,20 @@ package dev.openfeature.kotlin.sdk
33
import dev.openfeature.kotlin.sdk.events.OpenFeatureProviderEvents
44
import dev.openfeature.kotlin.sdk.exceptions.OpenFeatureError
55
import kotlinx.coroutines.ExperimentalCoroutinesApi
6+
import kotlinx.coroutines.delay
67
import kotlinx.coroutines.flow.MutableSharedFlow
78
import kotlinx.coroutines.flow.first
9+
import kotlinx.coroutines.flow.flow
810
import kotlinx.coroutines.launch
11+
import kotlinx.coroutines.sync.withLock
912
import kotlinx.coroutines.test.StandardTestDispatcher
1013
import kotlinx.coroutines.test.advanceUntilIdle
1114
import kotlinx.coroutines.test.runTest
1215
import kotlin.test.Test
1316
import kotlin.test.assertEquals
1417
import kotlin.test.assertNotEquals
1518
import kotlin.test.assertSame
19+
import kotlin.test.assertTrue
1620

1721
@OptIn(ExperimentalCoroutinesApi::class)
1822
class ProviderRepositoryTest {
@@ -157,4 +161,146 @@ class ProviderRepositoryTest {
157161
// Verify the newly mapped listener successfully overrides state
158162
assertEquals(OpenFeatureStatus.Stale, state.getStatus())
159163
}
164+
165+
@Test
166+
fun `DomainState should restart event listener if dispatcher changes`() = runTest {
167+
val state = DomainState()
168+
169+
val dispatcher1 = StandardTestDispatcher(testScheduler)
170+
val dispatcher2 = StandardTestDispatcher(testScheduler)
171+
172+
val providerEvents = MutableSharedFlow<OpenFeatureProviderEvents>()
173+
class MockEventProvider : FeatureProvider by NoOpProvider() {
174+
override fun observe() = providerEvents
175+
}
176+
val provider = MockEventProvider()
177+
state.providersFlow.value = provider
178+
179+
// Step 1: Initialize with dispatcher1
180+
state.initializeListener(dispatcher1)
181+
testScheduler.advanceUntilIdle()
182+
183+
// Emit event to verify dispatcher1 listener is working
184+
providerEvents.emit(OpenFeatureProviderEvents.ProviderReady())
185+
testScheduler.advanceUntilIdle()
186+
assertEquals(OpenFeatureStatus.Ready, state.getStatus())
187+
188+
// Step 2: Initialize with the EXACT SAME dispatcher, shouldn't disrupt anything
189+
state.initializeListener(dispatcher1)
190+
testScheduler.advanceUntilIdle()
191+
192+
// Step 3: Initialize with a DIFFERENT dispatcher (dispatcher2)
193+
state.initializeListener(dispatcher2)
194+
testScheduler.advanceUntilIdle()
195+
196+
// Fire another event, it should be processed by the NEW listener that was just hot-swapped
197+
providerEvents.emit(OpenFeatureProviderEvents.ProviderStale())
198+
testScheduler.advanceUntilIdle()
199+
assertEquals(OpenFeatureStatus.Stale, state.getStatus())
200+
}
201+
202+
@Test
203+
fun `DomainState should automatically retry and emit error on provider unhandled exception`() = runTest {
204+
val state = DomainState()
205+
val errorMsg = "Simulated internal crash"
206+
var observeCallCount = 0
207+
208+
class UnstableProvider : FeatureProvider by NoOpProvider() {
209+
override fun observe(): kotlinx.coroutines.flow.Flow<OpenFeatureProviderEvents> = flow {
210+
observeCallCount++
211+
throw RuntimeException(errorMsg)
212+
}
213+
}
214+
215+
state.providersFlow.value = UnstableProvider()
216+
val testDispatcher = StandardTestDispatcher(testScheduler)
217+
state.initializeListener(testDispatcher)
218+
219+
testScheduler.advanceTimeBy(100L) // Process first crash without entering infinite loop
220+
221+
val status = state.getStatus()
222+
assertTrue(status is OpenFeatureStatus.Error)
223+
assertEquals(errorMsg, status.error.message)
224+
assertEquals(1, observeCallCount)
225+
226+
// Then, advance by 3000ms. It should trigger retryWhen delay and retry
227+
testScheduler.advanceTimeBy(3500L)
228+
// Ensure it re-observed!
229+
assertEquals(2, observeCallCount)
230+
231+
// Cancel scope explicitly to avoid hanging the `runTest` finalizer loop
232+
state.shutdown()
233+
testScheduler.advanceUntilIdle()
234+
}
235+
236+
@Test
237+
fun `DomainState should drop oldest status seamlessly and avoid suspending backpressure when blasted`() = runTest {
238+
val state = DomainState()
239+
val eventsFlow = MutableSharedFlow<OpenFeatureProviderEvents>()
240+
241+
class SpammyProvider : FeatureProvider by NoOpProvider() {
242+
override fun observe() = eventsFlow
243+
}
244+
245+
state.providersFlow.value = SpammyProvider()
246+
val testDispatcher = StandardTestDispatcher(testScheduler)
247+
state.initializeListener(testDispatcher)
248+
testScheduler.advanceUntilIdle()
249+
250+
// Mimic a slow processor that purposefully does not collect from statusFlow.
251+
val slowSubscriber = launch(StandardTestDispatcher(testScheduler)) {
252+
state.statusFlow.collect {
253+
delay(10000L) // Extremely slow
254+
}
255+
}
256+
testScheduler.advanceUntilIdle()
257+
258+
// Blast 50 items simultaneously into eventsFlow -> emitStatus.
259+
// If it was BufferOverflow.SUSPEND this wouldn't finish.
260+
val job = launch(StandardTestDispatcher(testScheduler)) {
261+
for (i in 1..50) {
262+
eventsFlow.emit(
263+
if (i % 2 == 0) {
264+
OpenFeatureProviderEvents.ProviderReady()
265+
} else {
266+
OpenFeatureProviderEvents.ProviderStale()
267+
}
268+
)
269+
}
270+
}
271+
testScheduler.advanceUntilIdle()
272+
273+
// Assert the job actually finished and didn't hang
274+
assertTrue(job.isCompleted)
275+
slowSubscriber.cancel()
276+
277+
// Assert the state correctly processed them
278+
val finalStatus = state.getStatus()
279+
assertTrue(finalStatus is OpenFeatureStatus.Ready || finalStatus is OpenFeatureStatus.Stale)
280+
}
281+
282+
@Test
283+
fun `ProviderRepository clearAll should not deadlock against busy domain state shutdowns`() = runTest {
284+
val repository = ProviderRepository()
285+
val state = repository.getOrCreateState("deadlock-domain")
286+
287+
// Thread A: Holds providerMutex and tries to get repositoryMutex
288+
val threadA = launch(StandardTestDispatcher(testScheduler)) {
289+
state.providerMutex.withLock {
290+
delay(50L) // Wait to ensure Thread B traps repositoryMutex
291+
repository.getOrCreateState("new-domain") // Wants repositoryMutex
292+
}
293+
}
294+
295+
// Thread B: Runs clearAll
296+
val threadB = launch(StandardTestDispatcher(testScheduler)) {
297+
repository.clearAll() // Holds repositoryMutex initially, then wants providerMutex (via shutdown)
298+
}
299+
300+
testScheduler.advanceUntilIdle()
301+
302+
// If the vulnerability exists, both threads will be permanently blocked!
303+
assertTrue(threadA.isCompleted)
304+
assertTrue(threadB.isCompleted)
305+
}
160306
}

0 commit comments

Comments
 (0)