Skip to content

Testing all emissions in StateFlow without losing any #3939

Open
@dkhalanskyjb

Description

@dkhalanskyjb

Problem statement

A very common pain point when using our testing framework is that StateFlow conflates values emitted in quick succession.

Example (all examples are written without Turbine, DI frameworks, or other niceties to show the most universal form that doesn't rely on any technologies but the ones available in our library):

fun testStateFlow() = runTest {
    val flow = MutableStateFlow(0)
    // try to observe values [0; 10] in `flow`
    var i = 0
    val collector = launch {
        flow.collect {
            check(i == it) { "Expected $i but got $it" }
            ++i
            if (i == 10) this@launch.cancel()
        }
    }
    // send values [1; 10] to `flow`
    repeat(10) {
        flow.value = it + 1
    }
    // wait for the collector to check the values that were sent
    collector.join()
}

This test will fail with Expected 0 but got 10.

The reason is that there is no buffer inside StateFlow that stores the values that were emitted. Instead, only the latest value gets stored. runTest uses only a single thread, so while repeat(10) is running, the collector is waiting for its turn before it can start collecting values. The first moment when it gets its turn is collector.join(). Only then does the collector finally have a chance to check which values were sent—but at that point, StateFlow contains 10.

This issue was reported to us many times: #3120, #3367, #3339, #3519, #3143, https://slack-chats.kotlinlang.org/t/16039814/hey-all-wave-i-have-been-trying-to-test-the-intermediate-sta#9c1503b6-6da5-4e21-9aef-8b03c0dc1dbc, https://slack-chats.kotlinlang.org/t/16056041/is-there-any-interest-in-calling-out-that-stateflow-instance#2878e145-9e65-4095-9112-1da730105076, https://www.billjings.com/posts/title/testing-stateflow-is-annoying/?up= ...

Affected code

The typical use case we get approached with is similar to the following.

System under test is:

interface NetworkCaller {
  suspend fun getPage(uri: String): String
}

class AboutPageContents(
  private val scope: CoroutineScope,
  private val networkCaller: NetworkCaller,
) {
  val _stateFlow = MutableStateFlow<State>(State.EMPTY)
  val state = _stateFlow as StateFlow<State>

  fun startLoading() = scope.launch {
    _stateFlow.value = State.LOADING
    val contents = networkCaller.getPage("https://neocities.org/about")
    _stateFlow.value = State.HAS_CONTENTS(contents)
  }

  sealed interface State {
    object EMPTY: State;
    object LOADING: State;
    data class HAS_CONTENTS(val contents: String): State
  }
}

The test:

@Test
fun testAboutPageContentsLoading() = runTest {
    val fakeNetworkCaller = object: NetworkCaller {
        override suspend fun getPage(uri: String): String = "<html></html>"
    }
    val pageContents = AboutPageContents(this, fakeNetworkCaller)
    val collector = launch {
        var expectedState: AboutPageContents.State = AboutPageContents.State.EMPTY
        pageContents.state.collect {
            when (val currentState = expectedState) {
                AboutPageContents.State.EMPTY -> {
                    check(it === AboutPageContents.State.EMPTY) { "Expected EMPTY, got $it" }
                    expectedState = AboutPageContents.State.LOADING
                }
                AboutPageContents.State.LOADING -> {
                    check(it === AboutPageContents.State.LOADING) { "Expected LOADING, got $it" }
                    expectedState = AboutPageContents.State.HAS_CONTENTS("<html></html>")
                }
                is AboutPageContents.State.HAS_CONTENTS -> {
                    check(it is AboutPageContents.State.HAS_CONTENTS) { "Expected HAS_CONTENTS, got $it" }
                    check(it.contents == currentState.contents)
                    this@launch.cancel()
                }
            }
        }
    }
    pageContents.startLoading()
    collector.join()
}

The test tries to observe every step of the process of loading a web page: initially, it's empty; after calling startLoading, it's supposed to enter the LOADING state, and finally, after the page has loaded, we get HAS_CONTENTS. However, this test fails with Expected LOADING, got HAS_CONTENTS(contents=<html></html>): due to conflation, startLoading went from LOADING to HAS_CONTENTS before collect had a chance to process the LOADING state.

Fix

The problem with the above code is here:

    val fakeNetworkCaller = object: NetworkCaller {
        override suspend fun getPage(uri: String): String = "<html></html>"
    }

This should be replaced with

    val fakeNetworkCaller = object: NetworkCaller {
        override suspend fun getPage(uri: String): String {
            delay(10.milliseconds)
            return "<html></html>"
        }
    }

The reasoning is as follows:

  • The test will test exactly what you tell it to test. If the fake network call doesn't have a delay in its implementation, it's happening instantly. If it really happened instantly in real life, the user wouldn't be able to see the spinner or any other visual indication of the LOADING state: the state would get conflated, and we'd observe going from EMPTY to HAS_CONTENTS immediately, which is exactly what the test correctly shows. By inserting delay, we clarify that we want to emulate the case when the network call took a noticeably long time.
  • In fact, it may be worth testing the behavior of the system in both scenarios: how will it react if the network call is slow (and the spinner is visible), and how will it react if the network call is surprisingly fast (and the UI only sees the EMPTY and HAS_CONTENTS emissions in production).
  • If delay(25.milliseconds) executes on a test dispatcher and not on actual Dispatchers.IO, it will be instantaneous. So, this delay call has no impact on the performance of the tests.

Alternative fixes (not recommended)

These are the alternative ways to work around this issue, with explanations of why we don't recommend any of them. Some of these fixes we explicitly recommended before, but now that our understanding of the use case has deepened, we can confidently advise against them.

UnconfinedTestDispatchers

We can work around the issue if we replace the line

    val collector = launch {

with

    val collector = launch(UnconfinedTestDispatcher(testScheduler)) {

Being a test-framework-specific equivalent to Dispatchers.Unconfined, UnconfinedTestDispatcher this will ensure that each time an action to be executed on that dispatcher is enqueued (like a new value being available on the flow that's being collected), that action happens immediately.

This approach works. However, it's unintuitive and requires a deep dive into coroutines to understand it, but also a bit brittle: if assignments to the StateFlow also happen from an unconfined dispatcher, this behavior will not be guaranteed. This is a realistic concern:

  • Many tests are written like runTest(UnconfinedTestDispatcher()) to ensure all launch calls are entered immediately.
  • The Android-specific ViewModelScope uses Dispatchers.Main.immediate, which behaves like Dispatchers.Unconfined when used from the main thread. So, any test for a ViewModelScope is susceptible to this.

See also #3760

A custom CoroutineDispatcher

One can tweak the UnconfinedTestDispatcher approach by defining their own coroutine dispatcher to deal with this case robustly:

object DirectCoroutineDispatcher: CoroutineDispatcher() {
    override fun dispatch(context: CoroutineContext, block: Runnable) { block.run() }
}

Then,

    val collector = launch(DirectCoroutineDispatcher) {

will ensure the collection is happening immediately, even if runTest as a whole runs in UnconfinedTestDispatcher.

This is clearly unreadable to anyone not well-versed in coroutines, and it's giving up the delay-skipping behavior: if collector used a delay inside, it would be performed with the real-time clock, making the test take longer. In addition, and most importantly, none of the dispatchers in real life behave like this, so any test written with DirectCoroutineDispatcher is at risk of testing the behavior that can't actually happen in production, and what even is the point of testing the behavior you can never observe?

yield()

A common approach is to add a yield() after each assignment to make sure the test thread gets the chance to process it. In general, yield should not be used to ensure a specific execution order, as it will only allow the code to run until the next suspension, and the number of suspensions a function will go through during its execution is an implementation detail that may change. If one goes down the yield() path, they may end up with strange constructions like yield(); yield(); yield() // ensure everyone processed the change, which demonstrates the intent much worse than delay(25.milliseconds) // emulate a long network call does.

Semantically, yield is supposed to mean, "let this thread do some other work," without defining exactly which work and how much of it is being done. The behavior of yield is predictable, but tracing through the interleavings of coroutines manually is too much work when reading a simple test.

runCurrent() (/advanceUntilIdle())

Instead of yield, one can use runCurrent(). It's slightly better, as it doesn't rely on an exact number of context switches to perform: depending on your needs, it can work as yield(), or yield(); yield(); yield(): all the work that's scheduled to execute at the current moment will finish by the end of the call.

There are also downsides to runCurrent(). Notably, for this use case, it's a blocking call that can't be cancelled. See #3919 for more details and a discussion.

What should we do?

We should update the documentation with a clear explanation of what to do in this typical scenario. For now, we'll keep this issue open for some time to collect feedback on the proposed approach.

Metadata

Metadata

Assignees

No one assigned

    Labels

    docsKDoc and API referencetest

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions