Skip to content

Commit 9587590

Browse files
authored
Combine and zip rework (#2308)
* Rework Flow.zip operator: improve its performance by 40%, collect one of the upstreams in the same coroutine as emitter * Rework Flow.combine * Get rid of two code paths * Get rid of accidental O(N^2) where N is the number of flows that caused #2296 * Get rid of select that hits performance hard, improving performance by 50% in the pessimistic case * Get rid of crossinlines in API and implementation to fix Android issues * Make combine fairer and its results less surprising in sequential scenarios * Improve stacktrace recovery and stackwalking for SafeCollector, flowOn and zip operators * Update JMH Fixes #1743 Fixes #1683 Fixes #2296
1 parent 9eaa9c6 commit 9587590

File tree

21 files changed

+551
-231
lines changed

21 files changed

+551
-231
lines changed

Diff for: benchmarks/build.gradle.kts

+2-28
Original file line numberDiff line numberDiff line change
@@ -31,38 +31,12 @@ tasks.named<KotlinCompile>("compileJmhKotlin") {
3131
}
3232
}
3333

34-
/*
35-
* Due to a bug in the inliner it sometimes does not remove inlined symbols (that are later renamed) from unused code paths,
36-
* and it breaks JMH that tries to post-process these symbols and fails because they are renamed.
37-
*/
38-
val removeRedundantFiles by tasks.registering(Delete::class) {
39-
delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$buildHistoOnScore\$1\$\$special\$\$inlined\$filter\$1\$1.class")
40-
delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$nBlanks\$1\$\$special\$\$inlined\$map\$1\$1.class")
41-
delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$score2\$1\$\$special\$\$inlined\$map\$1\$1.class")
42-
delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$bonusForDoubleLetter\$1\$\$special\$\$inlined\$map\$1\$1.class")
43-
delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$nBlanks\$1\$\$special\$\$inlined\$map\$1\$2\$1.class")
44-
delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$bonusForDoubleLetter\$1\$\$special\$\$inlined\$map\$1\$2\$1.class")
45-
delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$score2\$1\$\$special\$\$inlined\$map\$1\$2\$1.class")
46-
delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOptKt\$\$special\$\$inlined\$collect\$1\$1.class")
47-
delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOptKt\$\$special\$\$inlined\$collect\$2\$1.class")
48-
delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$histoOfLetters\$1\$\$special\$\$inlined\$fold\$1\$1.class")
49-
delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleBase\$play\$buildHistoOnScore\$1\$\$special\$\$inlined\$filter\$1\$1.class")
50-
delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleBase\$play\$histoOfLetters\$1\$\$special\$\$inlined\$fold\$1\$1.class")
51-
delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/SaneFlowPlaysScrabble\$play\$buildHistoOnScore\$1\$\$special\$\$inlined\$filter\$1\$1.class")
5234

53-
// Primes
54-
delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/misc/Numbers\$\$special\$\$inlined\$filter\$1\$2\$1.class")
55-
delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/misc/Numbers\$\$special\$\$inlined\$filter\$1\$1.class")
56-
}
57-
58-
tasks.named("jmhRunBytecodeGenerator") {
59-
dependsOn(removeRedundantFiles)
60-
}
6135

6236
// It is better to use the following to run benchmarks, otherwise you may get unexpected errors:
6337
// ./gradlew --no-daemon cleanJmhJar jmh -Pjmh="MyBenchmark"
6438
extensions.configure<JMHPluginExtension>("jmh") {
65-
jmhVersion = "1.21"
39+
jmhVersion = "1.26"
6640
duplicateClassesStrategy = DuplicatesStrategy.INCLUDE
6741
failOnError = true
6842
resultFormat = "CSV"
@@ -80,7 +54,7 @@ tasks.named<Jar>("jmhJar") {
8054
}
8155

8256
dependencies {
83-
compile("org.openjdk.jmh:jmh-core:1.21")
57+
compile("org.openjdk.jmh:jmh-core:1.26")
8458
compile("io.projectreactor:reactor-core:${version("reactor")}")
8559
compile("io.reactivex.rxjava2:rxjava:2.1.9")
8660
compile("com.github.akarnokd:rxjava2-extensions:0.20.8")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package benchmarks.flow
6+
7+
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.flow.*
9+
import org.openjdk.jmh.annotations.*
10+
import java.util.concurrent.*
11+
12+
@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
13+
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
14+
@Fork(value = 1)
15+
@BenchmarkMode(Mode.Throughput)
16+
@OutputTimeUnit(TimeUnit.MILLISECONDS)
17+
@State(Scope.Benchmark)
18+
open class CombineFlowsBenchmark {
19+
20+
@Param("10", "100", "1000")
21+
private var size = 10
22+
23+
@Benchmark
24+
fun combine() = runBlocking {
25+
combine((1 until size).map { flowOf(it) }) { a -> a}.collect()
26+
}
27+
28+
@Benchmark
29+
fun combineTransform() = runBlocking {
30+
val list = (1 until size).map { flowOf(it) }.toList()
31+
combineTransform((1 until size).map { flowOf(it) }) { emit(it) }.collect()
32+
}
33+
}
34+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package benchmarks.flow
6+
7+
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.flow.*
9+
import kotlinx.coroutines.flow.internal.*
10+
import org.openjdk.jmh.annotations.*
11+
import java.util.concurrent.*
12+
13+
@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
14+
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
15+
@Fork(value = 1)
16+
@BenchmarkMode(Mode.Throughput)
17+
@OutputTimeUnit(TimeUnit.MILLISECONDS)
18+
@State(Scope.Benchmark)
19+
open class CombineTwoFlowsBenchmark {
20+
21+
@Param("100", "100000", "1000000")
22+
private var size = 100000
23+
24+
@Benchmark
25+
fun combinePlain() = runBlocking {
26+
val flow = (1 until size.toLong()).asFlow()
27+
flow.combine(flow) { a, b -> a + b }.collect()
28+
}
29+
30+
@Benchmark
31+
fun combineTransform() = runBlocking {
32+
val flow = (1 until size.toLong()).asFlow()
33+
flow.combineTransform(flow) { a, b -> emit(a + b) }.collect()
34+
}
35+
36+
@Benchmark
37+
fun combineVararg() = runBlocking {
38+
val flow = (1 until size.toLong()).asFlow()
39+
combine(listOf(flow, flow)) { arr -> arr[0] + arr[1] }.collect()
40+
}
41+
42+
@Benchmark
43+
fun combineTransformVararg() = runBlocking {
44+
val flow = (1 until size.toLong()).asFlow()
45+
combineTransform(listOf(flow, flow)) { arr -> emit(arr[0] + arr[1]) }.collect()
46+
}
47+
}

Diff for: benchmarks/src/jmh/kotlin/benchmarks/flow/NumbersBenchmark.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -77,14 +77,14 @@ open class NumbersBenchmark {
7777

7878
@Benchmark
7979
fun zipRx() {
80-
val numbers = rxNumbers().take(natural.toLong())
80+
val numbers = rxNumbers().take(natural)
8181
val first = numbers
8282
.filter { it % 2L != 0L }
8383
.map { it * it }
8484
val second = numbers
8585
.filter { it % 2L == 0L }
8686
.map { it * it }
87-
first.zipWith(second, BiFunction<Long, Long, Long> { v1, v2 -> v1 + v2 }).filter { it % 3 == 0L }.count()
87+
first.zipWith(second, { v1, v2 -> v1 + v2 }).filter { it % 3 == 0L }.count()
8888
.blockingGet()
8989
}
9090

@@ -98,7 +98,7 @@ open class NumbersBenchmark {
9898

9999
@Benchmark
100100
fun transformationsRx(): Long {
101-
return rxNumbers().take(natural.toLong())
101+
return rxNumbers().take(natural)
102102
.filter { it % 2L != 0L }
103103
.map { it * it }
104104
.filter { (it + 1) % 3 == 0L }.count()

Diff for: kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt

-8
Original file line numberDiff line numberDiff line change
@@ -137,14 +137,6 @@ internal abstract class AbstractSendChannel<E>(
137137
return sendSuspend(element)
138138
}
139139

140-
internal suspend fun sendFair(element: E) {
141-
if (offerInternal(element) === OFFER_SUCCESS) {
142-
yield() // Works only on fast path to properly work in sequential use-cases
143-
return
144-
}
145-
return sendSuspend(element)
146-
}
147-
148140
public final override fun offer(element: E): Boolean {
149141
val result = offerInternal(element)
150142
return when {

Diff for: kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt

-5
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,4 @@ internal open class ChannelCoroutine<E>(
3434
_channel.cancel(exception) // cancel the channel
3535
cancelCoroutine(exception) // cancel the job
3636
}
37-
38-
@Suppress("UNCHECKED_CAST")
39-
suspend fun sendFair(element: E) {
40-
(_channel as AbstractSendChannel<E>).sendFair(element)
41-
}
4237
}

Diff for: kotlinx-coroutines-core/common/src/flow/Migration.kt

+7-7
Original file line numberDiff line numberDiff line change
@@ -367,35 +367,35 @@ public fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspen
367367
message = "Flow analogue of 'combineLatest' is 'combine'",
368368
replaceWith = ReplaceWith("combine(this, other, other2, transform)")
369369
)
370-
public inline fun <T1, T2, T3, R> Flow<T1>.combineLatest(
370+
public fun <T1, T2, T3, R> Flow<T1>.combineLatest(
371371
other: Flow<T2>,
372372
other2: Flow<T3>,
373-
crossinline transform: suspend (T1, T2, T3) -> R
373+
transform: suspend (T1, T2, T3) -> R
374374
) = combine(this, other, other2, transform)
375375

376376
@Deprecated(
377377
level = DeprecationLevel.ERROR,
378378
message = "Flow analogue of 'combineLatest' is 'combine'",
379379
replaceWith = ReplaceWith("combine(this, other, other2, other3, transform)")
380380
)
381-
public inline fun <T1, T2, T3, T4, R> Flow<T1>.combineLatest(
381+
public fun <T1, T2, T3, T4, R> Flow<T1>.combineLatest(
382382
other: Flow<T2>,
383383
other2: Flow<T3>,
384384
other3: Flow<T4>,
385-
crossinline transform: suspend (T1, T2, T3, T4) -> R
385+
transform: suspend (T1, T2, T3, T4) -> R
386386
) = combine(this, other, other2, other3, transform)
387387

388388
@Deprecated(
389389
level = DeprecationLevel.ERROR,
390390
message = "Flow analogue of 'combineLatest' is 'combine'",
391391
replaceWith = ReplaceWith("combine(this, other, other2, other3, transform)")
392392
)
393-
public inline fun <T1, T2, T3, T4, T5, R> Flow<T1>.combineLatest(
393+
public fun <T1, T2, T3, T4, T5, R> Flow<T1>.combineLatest(
394394
other: Flow<T2>,
395395
other2: Flow<T3>,
396396
other3: Flow<T4>,
397397
other4: Flow<T5>,
398-
crossinline transform: suspend (T1, T2, T3, T4, T5) -> R
398+
transform: suspend (T1, T2, T3, T4, T5) -> R
399399
): Flow<R> = combine(this, other, other2, other3, other4, transform)
400400

401401
/**
@@ -482,4 +482,4 @@ public fun <T> Flow<T>.replay(bufferSize: Int): Flow<T> = noImpl()
482482
message = "Flow analogue of 'cache()' is 'shareIn' with unlimited replay and 'started = SharingStared.Lazily' argument'",
483483
replaceWith = ReplaceWith("this.shareIn(scope, Int.MAX_VALUE, started = SharingStared.Lazily)")
484484
)
485-
public fun <T> Flow<T>.cache(): Flow<T> = noImpl()
485+
public fun <T> Flow<T>.cache(): Flow<T> = noImpl()

Diff for: kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt

+20-6
Original file line numberDiff line numberDiff line change
@@ -224,19 +224,33 @@ private class UndispatchedContextCollector<T>(
224224
private val emitRef: suspend (T) -> Unit = { downstream.emit(it) } // allocate suspend function ref once on creation
225225

226226
override suspend fun emit(value: T): Unit =
227-
withContextUndispatched(emitContext, countOrElement, emitRef, value)
227+
withContextUndispatched(emitContext, value, countOrElement, emitRef)
228228
}
229229

230230
// Efficiently computes block(value) in the newContext
231-
private suspend fun <T, V> withContextUndispatched(
231+
internal suspend fun <T, V> withContextUndispatched(
232232
newContext: CoroutineContext,
233+
value: V,
233234
countOrElement: Any = threadContextElements(newContext), // can be precomputed for speed
234-
block: suspend (V) -> T, value: V
235+
block: suspend (V) -> T
235236
): T =
236237
suspendCoroutineUninterceptedOrReturn { uCont ->
237238
withCoroutineContext(newContext, countOrElement) {
238-
block.startCoroutineUninterceptedOrReturn(value, Continuation(newContext) {
239-
uCont.resumeWith(it)
240-
})
239+
block.startCoroutineUninterceptedOrReturn(value, StackFrameContinuation(uCont, newContext))
241240
}
242241
}
242+
243+
// Continuation that links the caller with uCont with walkable CoroutineStackFrame
244+
private class StackFrameContinuation<T>(
245+
private val uCont: Continuation<T>, override val context: CoroutineContext
246+
) : Continuation<T>, CoroutineStackFrame {
247+
248+
override val callerFrame: CoroutineStackFrame?
249+
get() = uCont as? CoroutineStackFrame
250+
251+
override fun resumeWith(result: Result<T>) {
252+
uCont.resumeWith(result)
253+
}
254+
255+
override fun getStackTraceElement(): StackTraceElement? = null
256+
}

0 commit comments

Comments
 (0)