Open
Description
See #1315 for one of the use-cases. Note, that is operator can be implemented in a quite a straightforward way using stable Flow
APIs. The simple implementation is given below:
fun <A, B: Any, R> Flow<A>.withLatestFrom(other: Flow<B>, transform: suspend (A, B) -> R): Flow<R> = flow {
coroutineScope {
val latestB = AtomicReference<B?>()
val outerScope = this
launch {
try {
other.collect { latestB.set(it) }
} catch(e: CancellationException) {
outerScope.cancel(e) // cancel outer scope on cancellation exception, too
}
}
collect { a: A ->
latestB.get()?.let { b -> emit(transform(a, b)) }
}
}
}
TODO: We need to figure out a name for this operation that is consistent with combine
and xxxLatest
naming convention. Even better, it should be a part of the customizable "combine family" of operations where each source flow can either react to the the incoming values or just reference the most recent value. It is also related to #1354 and #1082 which would introduce the concept of StateFlow
(a flow with the most recent value).