Skip to content

Feature: Flow.conflate with custom callback for merging elements #3884

Open
@kunyavskiy

Description

@kunyavskiy

Use case

I have some data, which is represented as flow of updates. And I need to calculate SateFlow of some derived data, which is essentially can be thought as Map<Int, SomeOtherData>.

Basic implementation would look like the following:

suspend fun compute(scope: CoroutineScope, updates: Flow<DataUpdate>) = updates
    .runningFold(emptyData()) { data, update -> data.merge(update) }
    .map { data -> data.getKeys().associateWith { slowComputation(it, data } }
    .stateIn(scope)

Unfortunately, this code works too slowly. I have two optimizations I want to implement.

  1. As state flow conflates, we can add a conflate call before slow computation to avoid some recomputation.
  2. It's possible to understand what keys should be recomputed because of the update, and it's sometimes quite a small set. So we could have something like.
suspend fun compute(scope: CoroutineScope, updates: Flow<DataUpdate>) = updates
    .runningFold(emptyData() to persistentMapOf<...>()) { (data, oldResult), update -> 
      val newData = data.merge(update)
      val newResult = oldResult.putAll(update.getKeys().associateWith { slowComputation(it, newData) })
      newData to newResult
    }
    .map { it.second }
    .stateIn(scope)

The problem is I see no way of implementing both of them simultaneously, as after implementing the second optimization, I can't drop intermediate recalculations.

The Shape of the API

I think it would be nice to have a function like fun <T> Flow<T>.conflate(merge : (T, T) -> T) with semantics "If a new element comes before the old one is received downstream, they are both replaced with merge(old, new)".

In my case, this would allow merging a bunch of updates that came while slowCompuatation runs in one update, avoiding intermediate recalculations, but at the same time recalculating only data affected by this bunch.

I failed to implement exactly this function based on conflate implementation, as I don't really understand what is ChannelFlowOperator. But I have a prototype of conflate + map based on based on StateFlow

fun <T, R> Flow<T>.conflatedMap(merge: (T, T) -> T, process : (T) -> R) = flow {
   coroutineScope {
     val local = MutableStateFlow<T?>(null)
     launch {
         collect { new ->
             local.update { old -> if (old == null) new else merge(old, new) }
         }
     }
     while (true) {
        val data =local.getAndUpdate { null } ?: local.filterNotNull().first().let { local.getAndUpdate { null } }
        emit(process(data))
     }
   }
}

But it has a lot of issues. For example, it doesn't propagate the end of flow correctly, doesn't correctly handle nullable types, and I'm not sure if works correctly if merge or process will throw. It sounds that implementing it fully correctly would require a deeper understanding of flow invariants than I have.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions