Open
Description
Use case
I have a flow of data, which is sent to http stream. I want to have some keep-alive pings in this stream, i.e. send en empty event, if there is nothing coming to flow.
So I want to have a stream transformer, which calls a function every time, when nothing is comming long enough.
The Shape of the API
I've implemented it as the following utility function
@OptIn(ExperimentalCoroutinesApi::class)
public fun <T> Flow<T>.onIdle(interval: Duration, block: suspend ProducerScope<T>.() -> Unit) = channelFlow {
val data = produceIn(this)
var finished = false
while (!finished) {
select {
data.onReceiveCatching {
if (it.isSuccess) {
send(it.getOrThrow())
} else {
finished = true
}
}
onTimeout(interval) {
block()
}
}
}
}