Skip to content

SendChannel.trySendBlocking() hangs thread upon callbackFlow channel close when unbuffered or buffer full #3952

Open
@bubenheimer

Description

@bubenheimer

Describe the bug

SendChannel.trySendBlocking() hangs the calling thread indefinitely when an unbuffered callbackFlow channel closes. Instead I expect trySendBlocking() to fail with an appropriate channel-related exception in a timely fashion once the channel closes.

Coroutines version 1.7.3

Provide a Reproducer

import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.Channel.Factory.RENDEZVOUS
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.channels.trySendBlocking
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.transform
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

var callback: ((Int) -> Unit)? = null

val thread = Thread {
    repeat(Int.MAX_VALUE) {
        println("Calling $it")
        callback!!(it)
    }
}

val flow = callbackFlow {
    callback = {
        println("Sending $it")
        try {
            trySendBlocking(it).exceptionOrNull()?.let { println(it) }
        } catch (t: Throwable) {
            println("Send fail: $t")
            throw t
        }
        println("Sent")
    }

    thread.start()

    awaitClose {
        GlobalScope.launch {
            repeat(5) {
                println("channel.isClosedForSend: ${channel.isClosedForSend}")
                delay(1_000L)
            }
        }
    }
}

runBlocking {
    val job = launch {
        flow
                .buffer(RENDEZVOUS)
                .transform {
                    emit(it)
                    delay(1_000L)
                }
                .onCompletion { println("Flow collection complete: $it") }
                .collect {
                    println("Received $it")
                }
    }

    println("Delaying")
    delay(5_000L)
    println("Cancelling")
    job.cancel()
    println("Delaying")
    delay(10_000L)
    println("Interrupting")
    thread.interrupt()
    println("Delaying")
    delay(10_000L)
    println("Done")
}

Output below:

Delaying
Calling 0
Sending 0
Sent
Calling 1
Sending 1
Received 0
Received 1
Sent
Calling 2
Sending 2
Received 2
Sent
Calling 3
Sending 3
Received 3
Sent
Calling 4
Sending 4
Received 4
Sent
Calling 5
Sending 5
Cancelling
Delaying
channel.isClosedForSend: false
Flow collection complete: kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@77b987f3
channel.isClosedForSend: true
channel.isClosedForSend: true
channel.isClosedForSend: true
channel.isClosedForSend: true
Interrupting
Delaying
Send fail: java.lang.InterruptedException
Done

The output shows that the callback thread hangs after the channel closes until it is forcibly interrupted from another thread.

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions