Skip to content

StateFlow-like primitive for big states with delta updates #3316

Open
@elizarov

Description

@elizarov

A common problem often appears when writing applications with a state flow in cases when a state is quite a big object, for example some kind of a list of events or other entities, a map, or something similar. These kinds of states are usually transferred over network as a one-time snapshot followed by incremental updates, and there is no ready-to-use primitive to represent them as a StateFlow.

A solution shall cleanly address two sides of the problem:

Ingress: turning a snapshot of the current state plus a sequence of updates into some kind of Flow representation.
Egress: turning a Flow representation into a snapshot of a current state, plus a sequence of updates.

A trivial solution is to use StateFlow<State> as a flow representation, which might be acceptable for moderately-sized state objects. In this representation, the ingress problem is solved like this:

val flow: StaetFlow<State> = flow<State> {
    var currentState = getCurrentStateSnapshot()
    emit(currentState)
    forEachIncomingUpdate { update -> 
        currentState = currentState.apply(update) // compute updated state
        emit(currentState) // emit updated state
    }
}.stateIn(scope) // convert into StateFlow<State>

And the egress problem can be solved like this:

var lastState = emptyState() // domain-specific empty state
flow.collect { state -> 
    val update = state.diffFrom(lastState) // compute delta between states
    processUpdate(update)
    lastState = state
}

This implementation assumes that we have fun State.diffFrom(other: State): Update that computes a difference between two states and represents them as an update. This is usually possible and would work fine as long as the states are not very big. However, this does not scale easily. If state big (for example, a list of 100s of thousands of items), then typically-quadratic implementation of apply and diffFrom functions will make it too slow to be used in practice.

An efficient design will work somewhat like a combination of StateFlow<State> and SharedFlow<Update>. It will rely on the apply function only and will internally maintain a current state snapshot to be delivered to the new subscribed, followed by the corresponding updates in sync. To make this kind of design representable as a Flow I would suggest to limit ourselves to case where State <: Update (a value space of states is a subset of a value space of update) or, saying it another way, only to cases where sending a n State snapshot is just one kind of Update operation. We will represent the result as a Flow<Update> and will need a new kind of name for this primitive. Let's call it, tentatively, UpdateFlow for now.

A sketch of the corresponding API might look like this. On ingress side one writes:

val flow: UpdateFlow<Update> = flow<Update> { 
    emit(getCurrentStateSnapshot()) // Note: State <: Update
    forEachIncomingUpdate { update -> emit(update) }
}.updateIn(state, ::apply) // convert into UpdateFlow<State>

The apply will need to a type of (Update, Update) -> Update), that is a function that is capable of merging a snapshot plus update, and an arbitrary pair of updates into an update. It will be used internally to conflate updates for slow subscribers.

On egress side one writes:

flow.collect { update -> 
    // Note: the first update is guaranteed to be a snapshot
    processUpdate(update) 
}

All names in this "design" as TBD

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions