Skip to content

GrpcStreamingCall() factory's executeIn() hardcodes Dispatchers.IO instead of using an incoming scope #3559

@SaqibJDev

Description

@SaqibJDev

Summary

GrpcStreamingCall() factory's executeIn() hardcodes Dispatchers.IO, ignoring the scope's dispatcher — breaks kotlinx-coroutines-test virtual time

Description

The GrpcStreamingCall() convenience factory function in GrpcCalls.kt hardcodes Dispatchers.IO in its executeIn() implementation:

override fun executeIn(scope: CoroutineScope): Pair<SendChannel<S>, ReceiveChannel<R>> {
    check(executed.compareAndSet(false, true)) { "already executed" }

    val job = scope.launch(Dispatchers.IO) {  // ← ignores scope's dispatcher
        try {
            function(requestChannel, responseChannel)
        } catch (e: Exception) {
            requestChannel.close(e)
            responseChannel.close(e)
        }
    }
    // ...
}

While scope.launch(Dispatchers.IO) correctly makes the coroutine a child of scope (preserving structured concurrency for cancellation), it overrides the scope's dispatcher. This means the provided lambda always runs on real Dispatchers.IO threads, regardless of what dispatcher the scope carries.

This makes GrpcStreamingCall() unusable in unit tests that use kotlinx-coroutines-test's virtual time (runTest, TestDispatcher, TestScheduler), which is the standard approach for testing coroutine-based code.

The Problem

When using the GrpcStreamingCall() factory as a test fake, the lambda (mock server logic) runs on Dispatchers.IO with real wall-clock time, while the production code under test runs on a TestDispatcher with virtual time.

runTest auto-advances virtual time when coroutines are suspended. Any withTimeoutOrNull() or delay() in the production code fires instantly in virtual time — before the real IO thread has had a chance to process and respond. This creates a race condition that causes flaky tests.

Reproducer

@Test
fun `flaky test due to Dispatchers_IO override`() = runTest {
    val testDispatcher = UnconfinedTestDispatcher(testScheduler)
    val testScope = CoroutineScope(testDispatcher + SupervisorJob())

    // This factory hardcodes Dispatchers.IO in executeIn()
    val grpcCall = GrpcStreamingCall<MyRequest, MyResponse> { requests, responses ->
        val request = requests.receive()
        // Simulate server processing
        responses.send(MyResponse(success = true))
        responses.close()
    }

    val (sendChannel, receiveChannel) = grpcCall.executeIn(testScope)

    // Send a request
    sendChannel.send(MyRequest())

    // This timeout uses virtual time (TestScheduler) — runTest auto-advances it.
    // But the GrpcStreamingCall lambda runs on Dispatchers.IO (real time).
    // Virtual time advances past 3000ms BEFORE the IO thread sends the response.
    val response = withTimeoutOrNull(3000) {
        receiveChannel.receiveAsFlow().first()
    }

    // FLAKY: sometimes null (timeout fired before IO thread responded),
    // sometimes non-null (IO thread was fast enough)
    assertNotNull(response)

    testScope.cancel()
}

Expected: The test passes deterministically because the scope's UnconfinedTestDispatcher is respected, keeping everything on virtual time.

Actual: The test is flaky. runTest auto-advances virtual time past the 3000ms timeout before the Dispatchers.IO thread delivers the response. The test intermittently fails depending on real-time thread scheduling.

Suggested Fix

Respect the scope's dispatcher by removing the Dispatchers.IO override in executeIn():

override fun executeIn(scope: CoroutineScope): Pair<SendChannel<S>, ReceiveChannel<R>> {
    check(executed.compareAndSet(false, true)) { "already executed" }

-   val job = scope.launch(Dispatchers.IO) {
+   val job = scope.launch {
        try {
            function(requestChannel, responseChannel)
        } catch (e: Exception) {
            requestChannel.close(e)
            responseChannel.close(e)
        }
    }

This way:

  • Production callers passing a scope with Dispatchers.IO get the same behavior as before.
  • Test callers passing a scope with TestDispatcher get deterministic virtual-time execution.
  • The executeIn(scope) API contract is honored — the function truly executes in the given scope's context.

Note: GrpcCall() (the unary variant) does not have this problem — it executes synchronously on the calling thread. Only GrpcStreamingCall() is affected.

Workaround

Mock executeIn() to avoid the factory entirely, managing channels manually on the test dispatcher:

val requestChannel = Channel<MyRequest>(1)
val responseChannel = Channel<MyResponse>(1)

val grpcCall = mockk<GrpcStreamingCall<MyRequest, MyResponse>>()
every { grpcCall.executeIn(any()) } answers {
    val scope = firstArg<CoroutineScope>()
    scope.launch {
        val request = requestChannel.receive()
        responseChannel.send(MyResponse(success = true))
        requestChannel.cancel()
        responseChannel.close()
    }
    requestChannel to responseChannel
}

This keeps the handler on the scope's dispatcher, eliminating the real-time vs virtual-time race.

Environment

  • Wire version: 5.3.3, 5.4.0, 5.5.0 (also confirmed present in 6.0.0-alpha02)
  • kotlinx-coroutines-test: 1.7+
  • Kotlin: 1.9+

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions