Description
Hello,
I'm trying to solve a problem perfectly with the help of channelFlow { … }
, but send(…)
's unavoidable prompt-cancellation guarantee is getting in the way.
Use case:
In the snippet below, I want to be able to know if newValue
was successfully delivered, and if so, I want to ensure lastEmitElapsedNanos
is set.
Unfortunately, because of Channel
's send(…)
prompt cancellation guarantee, it's impossible to know if cancellation happened before or after the channelFlow
received the value and passed it downstream.
The onUndeliveredElement
parameter of Channel
can't be set from channelFlow
, nor callbackFlow
, and even if it was possible, it's be called in the wrong scope.
Here, I need a variant of send
that checks cancellation only until the value is accepted: if cancellation happens as the value was just delivered, I want it to not throw CancellationException
, and let me keep the fact that the value was passed downstream. A sort of "atomic" behavior on exit.
fun <T> Flow<T>.rateLimit(minInterval: Duration): Flow<T> = channelFlow {
var lastEmitElapsedNanos = 0L
collectLatest { newValue ->
val nanosSinceLastEmit = SystemClock.elapsedRealtimeNanos() - lastEmitElapsedNanos
val timeSinceLastEmit = nanosSinceLastEmit.nanoseconds
val timeToWait = minInterval - timeSinceLastEmit.coerceAtMost(minInterval)
delay(timeToWait)
send(newValue) // We don't handle the case where cancellation happens after it's received.
lastEmitElapsedNanos = SystemClock.elapsedRealtimeNanos()
}
}.buffer(Channel.RENDEZVOUS)
What do we have now?
send
with prompt cancellation guarantees, but with no way to know if the value was actually delivered when the coroutine calling send
can be cancelled.
What should be instead?
send
could have a sendAtomic
variant, or something, that doesn't propagate cancellation if the value was just delivered.
Why?
I'm not aware of any workaround at the moment.
The code above would become instantly correct after using the new "atomic" version, instead of not updating lastEmitElapsedNanos
when newValue
was delivered while the receiver flow got a new value that made collectLatest
cancel the sub-coroutine.
Why not?
Not a breaking change if added as an overload (e.g. sendAtomic(…)
or send(…, atomic = true)
.
Potentially breaking behavior if send
is changed instead.