Skip to content

MutableSharedFlow rendezvous #2818

Open
@Dominaezzz

Description

@Dominaezzz

I expected this bit of code to deadlock. (The onEach is a workaround for #2817)

val stream = MutableSharedFlow<Unit>()
stream/* .onEach {} */.buffer(Channel.RENDEZVOUS).produceIn(this)
yield()
stream.emit(Unit)
// stream.emit(Unit)

This did not deadlock until I commented out the second emission.

Is this by design? The docs don't seem to suggest this.

[emit][MutableSharedFlow.emit] call to such a shared flow suspends until all subscribers receive the emitted value

I haven't received anything from the channel so this shouldn't happen.

I need this behaviour so I need to decide between using a workaround or a re-design?
If this won't be fixed then I'll redesign but if it will be fixed/implemented then I justify using a workaround.

Mini investigation

So API aside, I had a look at the code and the culprit seems to be in SharedFlow.collect.

while (true) {
var newValue: Any?
while (true) {
newValue = tryTakeValue(slot) // attempt no-suspend fast path first
if (newValue !== NO_VALUE) break
awaitValue(slot) // await signal that the new value is available
}
collectorJob?.ensureActive()
collector.emit(newValue as T)
}
.

tryTakeValue/awaitValue "pops" the value out of the buffer for the current subscriber, (which effectively counts as the receipt the docs talked about) then the value is emitted to the collector.

Looking at tryTakeValue.

private fun tryTakeValue(slot: SharedFlowSlot): Any? {
var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
val value = synchronized(this) {
val index = tryPeekLocked(slot)
if (index < 0) {
NO_VALUE
} else {
val oldIndex = slot.index
val newValue = getPeekedValueLockedAt(index)
slot.index = index + 1 // points to the next index after peeked one
resumes = updateCollectorIndexLocked(oldIndex)
newValue
}
}
for (resume in resumes) resume?.resume(Unit)
return value
}

In theory, that could be changed to a two step process, one step to peek the value in the buffer (i.e. getPeekedValueLockedAt), then the value is emitted and another step to pop/release the value in the buffer (i.e. slot index increment and updateCollectorIndexLocked).

It means locking twice instead of once but it's probably best to measure than listen to whatever I was about hypothesise here lol.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions