diff --git a/README.md b/README.md index 08417dd0f5..ff6c545341 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ [![Slack channel](https://img.shields.io/badge/chat-slack-green.svg?logo=slack)](https://kotlinlang.slack.com/messages/coroutines/) Library support for Kotlin coroutines with [multiplatform](#multiplatform) support. -This is a companion version for Kotlin `1.4.30` release. +This is a companion version for the Kotlin `1.4.30` release. ```kotlin suspend fun main() = coroutineScope { @@ -75,7 +75,7 @@ suspend fun main() = coroutineScope { ## Using in your projects -The libraries are published to [kotlinx](https://bintray.com/kotlin/kotlinx/kotlinx.coroutines) bintray repository, +The libraries are published to [kotlinx](https://bintray.com/kotlin/kotlinx/kotlinx.coroutines) Bintray repository, linked to [JCenter](https://bintray.com/bintray/jcenter?filterByPkgName=kotlinx.coroutines) and pushed to [Maven Central](https://search.maven.org/#search%7Cga%7C1%7Cg%3Aorg.jetbrains.kotlinx%20a%3Akotlinx-coroutines*). @@ -148,16 +148,16 @@ Make sure that you have `mavenCentral()` in the list of repositories. ### Android Add [`kotlinx-coroutines-android`](ui/kotlinx-coroutines-android) -module as dependency when using `kotlinx.coroutines` on Android: +module as a dependency when using `kotlinx.coroutines` on Android: ```groovy implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.4.3' ``` -This gives you access to Android [Dispatchers.Main] -coroutine dispatcher and also makes sure that in case of crashed coroutine with unhandled exception this -exception is logged before crashing Android application, similarly to the way uncaught exceptions in -threads are handled by Android runtime. +This gives you access to the Android [Dispatchers.Main] +coroutine dispatcher and also makes sure that in case of a crashed coroutine with an unhandled exception that +this exception is logged before crashing the Android application, similarly to the way uncaught exceptions in +threads are handled by the Android runtime. #### R8 and ProGuard @@ -168,7 +168,7 @@ For more details see ["Optimization" section for Android](ui/kotlinx-coroutines- The `kotlinx-coroutines-core` artifact contains a resource file that is not required for the coroutines to operate normally and is only used by the debugger. To exclude it at no loss of functionality, add the following snippet to the -`android` block in your gradle file for the application subproject: +`android` block in your Gradle file for the application subproject: ```groovy packagingOptions { exclude "DebugProbesKt.bin" @@ -180,7 +180,7 @@ packagingOptions { Core modules of `kotlinx.coroutines` are also available for [Kotlin/JS](https://kotlinlang.org/docs/reference/js-overview.html) and [Kotlin/Native](https://kotlinlang.org/docs/reference/native-overview.html). -In common code that should get compiled for different platforms, you can add dependency to `kotlinx-coroutines-core` right to the `commonMain` source set: +In common code that should get compiled for different platforms, you can add a dependency to `kotlinx-coroutines-core` right to the `commonMain` source set: ```groovy commonMain { dependencies { @@ -189,7 +189,7 @@ commonMain { } ``` -No more additional dependencies is needed, platform-specific artifacts will be resolved automatically via Gradle metadata available since Gradle 5.3. +No more additional dependencies are needed, platform-specific artifacts will be resolved automatically via Gradle metadata available since Gradle 5.3. Platform-specific dependencies are recommended to be used only for non-multiplatform projects that are compiled only for target platform. @@ -207,11 +207,11 @@ the target Kotlin/Native platform. [List of currently supported targets](https:/ Only single-threaded code (JS-style) on Kotlin/Native is supported in stable versions. -Additionally, special `-native-mt` version is released on a regular basis, for the state of multi-threaded coroutines support +Additionally, a special `-native-mt` version is released on a regular basis, for the state of multi-threaded coroutines support please follow the [corresponding issue](https://github.com/Kotlin/kotlinx.coroutines/issues/462) for the additional details. Since Kotlin/Native does not generally provide binary compatibility between versions, -you should use the same version of Kotlin/Native compiler as was used to build `kotlinx.coroutines`. +you should use the same version of the Kotlin/Native compiler as was used to build `kotlinx.coroutines`. ## Building and Contributing diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api index 23f29b02a3..ce5224dd85 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api @@ -860,6 +860,20 @@ public abstract class kotlinx/coroutines/flow/AbstractFlow : kotlinx/coroutines/ public abstract fun collectSafely (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } +public abstract interface class kotlinx/coroutines/flow/ChunkingMethod { + public static final field Companion Lkotlinx/coroutines/flow/ChunkingMethod$Companion; + public abstract fun chunk (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow; +} + +public final class kotlinx/coroutines/flow/ChunkingMethod$Companion { + public final fun BySize (I)Lkotlinx/coroutines/flow/ChunkingMethod; + public final fun ByTime (JI)Lkotlinx/coroutines/flow/ChunkingMethod; + public static synthetic fun ByTime$default (Lkotlinx/coroutines/flow/ChunkingMethod$Companion;JIILjava/lang/Object;)Lkotlinx/coroutines/flow/ChunkingMethod; + public final fun ByTimeOrSize (JI)Lkotlinx/coroutines/flow/ChunkingMethod; + public final fun Natural (I)Lkotlinx/coroutines/flow/ChunkingMethod; + public static synthetic fun Natural$default (Lkotlinx/coroutines/flow/ChunkingMethod$Companion;IILjava/lang/Object;)Lkotlinx/coroutines/flow/ChunkingMethod; +} + public abstract interface class kotlinx/coroutines/flow/Flow { public abstract fun collect (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } @@ -894,6 +908,7 @@ public final class kotlinx/coroutines/flow/FlowKt { public static final fun cancellable (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow; public static final fun catch (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; public static final fun channelFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; + public static final fun chunked (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/ChunkingMethod;)Lkotlinx/coroutines/flow/Flow; public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun collectIndexed (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; diff --git a/kotlinx-coroutines-core/common/src/flow/StateFlow.kt b/kotlinx-coroutines-core/common/src/flow/StateFlow.kt index 74d3314075..887f354223 100644 --- a/kotlinx-coroutines-core/common/src/flow/StateFlow.kt +++ b/kotlinx-coroutines-core/common/src/flow/StateFlow.kt @@ -21,7 +21,7 @@ import kotlin.native.concurrent.* * neither does a coroutine started by the [Flow.launchIn] function. An active collector of a state flow is called a _subscriber_. * * A [mutable state flow][MutableStateFlow] is created using `MutableStateFlow(value)` constructor function with - * the initial value. The value of mutable state flow can be updated by setting its [value] property. + * the initial value. The value of mutable state flow can be updated by setting its [value] property. * Updates to the [value] are always [conflated][Flow.conflate]. So a slow collector skips fast updates, * but always collects the most recently emitted value. * diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Chunk.kt b/kotlinx-coroutines-core/common/src/flow/operators/Chunk.kt new file mode 100644 index 0000000000..b0a326391c --- /dev/null +++ b/kotlinx-coroutines-core/common/src/flow/operators/Chunk.kt @@ -0,0 +1,211 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +@file:JvmMultifileClass +@file:JvmName("FlowKt") + +package kotlinx.coroutines.flow + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import kotlinx.coroutines.flow.internal.* +import kotlinx.coroutines.selects.* +import kotlin.jvm.* + +/** + * Groups emissions from this Flow into lists, according to the chosen ChunkingMethod. Time based implementations + * collect upstream and emit to downstream in separate coroutines - concurrently, like Flow.buffer() operator. + * Exact timing of emissions is not guaranteed, as it depends on collector coroutine availability. + * + * Size based chunking happens in a single coroutine and is purely sequential. + * + * Emissions always preserve order. + * + * It is possible to pass custom implementation of ChunkingMethod to chunked() operator. + * + * @param method Defines constrains on chunk size and time of its emission. + */ + +@ExperimentalCoroutinesApi +public fun Flow.chunked(method: ChunkingMethod): Flow> = with(method) { chunk() } + +@ExperimentalCoroutinesApi +public interface ChunkingMethod { + public fun Flow.chunk(): Flow> + + public companion object { + + /** + * Collects upstream and emits to downstream in separate coroutines - as soon as possible. If consumer keeps + * up with the producer, it emits lists with single element. + * + * In case of slow consumer, it groups emissions into bigger lists. When consumer "speeds up", chunks + * will get smaller. + * + * @param maxSize Maximum size of a single chunk. If reached, producer gets suspended until consumer "consumes" + * a chunk. If maxSize is not specified, then chunk may grow indefinitely until jvm runs out of memory. + */ + @Suppress("FunctionName") + public fun Natural(maxSize: Int = Int.MAX_VALUE): ChunkingMethod = NaturalChunking(maxSize) + + /** + * Collects upstream into a buffer and emits its content as a list at every interval. When upstream completes + * (or is empty), it will try to emit immediately what is left of a chunk, omitting the interval. + * + * @param intervalMs Interval between emissions in milliseconds. Every emission happens only after + * interval passes, unless upstream Flow completes sooner. + * + * @param maxSize Maximum size of a single chunk. If reached, producer gets suspended until consumer "consumes" + * a chunk. If maxSize is not specified, then chunk may grow indefinitely until jvm runs out of memory. + */ + @Suppress("FunctionName") + public fun ByTime(intervalMs: Long, maxSize: Int = Int.MAX_VALUE): ChunkingMethod = + TimeBased(intervalMs, maxSize) + + /** + * Collects upstream into a buffer and emits its content as a list at every interval or when its buffer reaches + * maximum size. When upstream completes (or is empty), it will try to emit immediately what is left of + * a chunk, omitting the interval and maxSize constraints. + * + * @param intervalMs Interval between emissions in milliseconds. Every emission happens only after + * interval passes, unless upstream Flow completes sooner or maximum size of a chunk is reached. + * + * @param maxSize Maximum size of a single chunk. If reached, it will try to emit a chunk, ignoring the + * interval constraint. If so happens, time-to-next-chunk gets reset to the interval value. + */ + @Suppress("FunctionName") + public fun ByTimeOrSize(intervalMs: Long, maxSize: Int): ChunkingMethod = TimeOrSizeBased(intervalMs, maxSize) + + /** + * Collects upstream into a buffer and emits its content as a list, when specified size is reached. + * This implementation is purely sequential. If concurrent upstream collection and downstream emissions are + * desired, one can use a buffer() operator after chunking + * + * @param size Exact size of emitted chunks. Only the last emission may be smaller. + */ + @Suppress("FunctionName") + public fun BySize(size: Int): ChunkingMethod = SizeBased(size) + } +} + +private class NaturalChunking(private val maxSize: Int) : ChunkingMethod { + + init { + requirePositive(maxSize) + } + + override fun Flow.chunk(): Flow> = scopedFlow { downstream -> + val upstream = buffer(maxSize).produceIn(this) + + while (!upstream.isClosedForReceive) { + val chunk = upstream.awaitFirstAndDrain(maxSize) + if (chunk.isNotEmpty()) downstream.emit(chunk) + } + } +} + +private class TimeBased(private val intervalMs: Long, private val maxSize: Int) : ChunkingMethod { + + init { + requirePositive(intervalMs) + requirePositive(maxSize) + } + + override fun Flow.chunk(): Flow> = scopedFlow { downstream -> + val upstreamCollection = Job() + val upstream = produce(capacity = maxSize) { + collect { element -> channel.send(element) } + upstreamCollection.complete() + } + + whileSelect { + upstreamCollection.onJoin { + val chunk = upstream.drain(maxElements = maxSize) + if (chunk.isNotEmpty()) downstream.emit(chunk) + false + } + + onTimeout(intervalMs) { + val chunk = upstream.drain(maxElements = maxSize) + if (chunk.isNotEmpty()) downstream.emit(chunk) + true + } + } + } +} + +private class SizeBased(private val size: Int) : ChunkingMethod { + + init { + requirePositive(size) + } + + override fun Flow.chunk(): Flow> = flow { + val accumulator = ArrayList(size) + collect { element -> + accumulator.add(element) + if (accumulator.size == size) emit(accumulator.drain()) + } + if (accumulator.isNotEmpty()) emit(accumulator) + } +} + +private class TimeOrSizeBased(private val intervalMs: Long, private val maxSize: Int) : ChunkingMethod { + + init { + requirePositive(intervalMs) + requirePositive(maxSize) + } + + override fun Flow.chunk(): Flow> = scopedFlow { downstream -> + val emitNowAndMaybeContinue = Channel(capacity = Channel.RENDEZVOUS) + val elements = produce(capacity = maxSize) { + collect { element -> + val hasCapacity = channel.trySend(element).isSuccess + if (!hasCapacity) { + emitNowAndMaybeContinue.send(true) + channel.send(element) + } + } + emitNowAndMaybeContinue.send(false) + } + + whileSelect { + emitNowAndMaybeContinue.onReceive { shouldContinue -> + val chunk = elements.drain(maxElements = maxSize) + if (chunk.isNotEmpty()) downstream.emit(chunk) + shouldContinue + } + + onTimeout(intervalMs) { + val chunk = elements.drain(maxElements = maxSize) + if (chunk.isNotEmpty()) downstream.emit(chunk) + true + } + } + } + +} + +private suspend fun ReceiveChannel.awaitFirstAndDrain(maxElements: Int): List = try { + val first = receive() + drain(mutableListOf(first), maxElements) +} catch (e: ClosedReceiveChannelException) { + emptyList() +} + + +private tailrec fun ReceiveChannel.drain(acc: MutableList = mutableListOf(), maxElements: Int): List = + if (acc.size == maxElements) acc + else { + val nextValue = tryReceive().getOrElse { error: Throwable? -> error?.let { throw(it) } ?: return acc } + acc.add(nextValue) + drain(acc, maxElements) + } + +private fun MutableList.drain() = toList().also { this.clear() } + +private fun requirePositive(size: Int) = require(size > 0) + +private fun requirePositive(intervalMs: Long) = require(intervalMs > 0) \ No newline at end of file diff --git a/kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt new file mode 100644 index 0000000000..1b70eee4be --- /dev/null +++ b/kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt @@ -0,0 +1,236 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow + +import kotlinx.coroutines.* +import kotlin.test.* +import kotlin.time.* + +@ExperimentalTime +class ChunkedTest : TestBase() { + + @Test + fun testEmptyFlowSizeBasedChunking() = runTest { + val emptyFlow = emptyFlow() + val result = emptyFlow.chunked(ChunkingMethod.BySize(5)).toList() + assertTrue(result.isEmpty()) + } + + @Test + fun testUndersizedFlowSizeBasedChunking() = runTest { + val undersizedFlow = flow { + for (i in 1..3) emit(i) + } + val result = undersizedFlow.chunked(ChunkingMethod.BySize(5)).toList() + assertEquals(1, result.size) + assertEquals(listOf(1, 2, 3), result.first()) + } + + @Test + fun testOversizedFlowSizeBasedChunking() = runTest { + val oversizedFlow = flow { + for (i in 1..10) emit(i) + } + val result = oversizedFlow.chunked(ChunkingMethod.BySize(3)).toList() + assertEquals(4, result.size) + assertEquals(3, result.first().size) + assertEquals(1, result[3].size) + + } + + @Test + fun testEmptyFlowNaturalChunking() = runTest { + val emptyFlow = emptyFlow() + val result = emptyFlow.chunked(ChunkingMethod.Natural()).toList() + assertTrue(result.isEmpty()) + } + + @Test + fun testFastCollectorNaturalChunking() = withVirtualTime { + val slowProducer = flow { + for (i in 1..10) { + delay(5) + emit(i) + } + } + + val result = slowProducer.chunked(ChunkingMethod.Natural()).toList() + assertEquals(10, result.size) + result.forEach { assertEquals(1, it.size) } + + finish(1) + } + + @Test + fun testSlowCollectorNaturalChunking() = withVirtualTime { + val producerInterval = 5L + val fastProducer = flow { + emit(1) + expect(1) + delay(producerInterval) + + emit(2) + expect(3) + delay(producerInterval) + + emit(3) + expect(4) + delay(producerInterval) + + emit(4) + expect(6) + delay(producerInterval) + + emit(5) + expect(7) + delay(producerInterval) + } + + val result = fastProducer.chunked(ChunkingMethod.Natural()).withIndex().onEach { indexed -> + when (indexed.index) { + 0 -> expect(2) + 1 -> expect(5) + 2 -> finish(8) + } + delay(11) + }.toList() + + assertEquals(3, result.size) + assertEquals(1, result.first().value.size) + for (i in 1..2) assertEquals(2, result[i].value.size) + } + + @Test + fun testErrorPropagationInNaturalChunking() = runTest { + val exception = IllegalArgumentException() + val failedFlow = flow { + emit(1) + emit(2) + throw exception + } + var catchedException: Throwable? = null + + val result = failedFlow + .chunked(ChunkingMethod.Natural()) + .catch { e -> + catchedException = e + emit(listOf(3)) + } + .toList() + + assertTrue(catchedException is IllegalArgumentException) + assertEquals(3, result.first().single()) + } + + @Test + fun testEmptyFlowWithSlowTimeBasedChunking() = runTest { + val emptyFlow = emptyFlow() + val result = measureTimedValue { emptyFlow.chunked(ChunkingMethod.ByTime(intervalMs = 10 * 1000)).toList() } + assertTrue(result.value.isEmpty()) + assertTrue(result.duration < 500.milliseconds) + } + + @Test + fun testErrorPropagationInTimeBasedChunking() = runTest { + val exception = IllegalArgumentException() + val failedFlow = flow { + emit(1) + emit(2) + throw exception + } + var catchedException: Throwable? = null + + val result = failedFlow + .chunked(ChunkingMethod.ByTime(10 * 10_000)) + .catch { e -> + catchedException = e + emit(listOf(3)) + } + .toList() + + assertTrue(catchedException is IllegalArgumentException) + assertEquals(3, result.first().single()) + } + + @Test + fun testTimeBasedChunkingOfMultipleElements() = withVirtualTime { + val producer = flow { + for (i in 1..10) { + delay(1000) + emit(i) + } + } + + val result = producer.chunked(ChunkingMethod.ByTime(5500)).toList() + + finish(1) + + assertEquals(2, result.size) + assertEquals(5, result.first().size) + assertEquals(5, result[1].size) + } + + @Test + fun testTimeBasedChunkingWithMaxChunkSizeSuspendingProducer() = runTest { + val producer = flow { + for (i in 1..10) { + emit(i) + } + } + + val result = measureTimedValue { producer.chunked(ChunkingMethod.ByTime(200, maxSize = 5)).toList() } + + finish(1) + + assertEquals(2, result.value.size) + assertEquals(5, result.value.first().size) + assertEquals(5, result.value[1].size) + assertTrue(result.duration >= 200.milliseconds, "expected time at least 200 ms but was: ${result.duration}") + } + + @Test + fun testEmptyFlowTimeOrSizeBasedChunking() = runTest { + val emptyFlow = emptyFlow() + val result = measureTimedValue { + emptyFlow.chunked(ChunkingMethod.ByTimeOrSize(intervalMs = 10 * 1000, maxSize = 5)).toList() + } + assertTrue(result.value.isEmpty()) + assertTrue(result.duration < 500.milliseconds) + } + + @Test + fun testMultipleElementsFillingBufferWithTimeOrSizeBasedChunking() = runTest { + val flow = flow { + for (i in 1..10) { + emit(i) + } + } + val result = measureTimedValue { + flow.chunked(ChunkingMethod.ByTimeOrSize(intervalMs = 10 * 1000, maxSize = 5)).toList() + } + assertEquals(2, result.value.size) + assertEquals(5, result.value.first().size) + assertEquals(5, result.value[1].size) + assertTrue(result.duration < 500.milliseconds) + } + + @Test + fun testMultipleElementsNotFillingBufferWithTimeOrSizeBasedChunking() = withVirtualTime { + val flow = flow { + for (i in 1..5) { + delay(500) + emit(i) + } + } + val result = flow.chunked(ChunkingMethod.ByTimeOrSize(intervalMs = 1100, maxSize = 500)).toList() + + assertEquals(3, result.size) + assertEquals(2, result.first().size) + assertEquals(2, result[1].size) + assertEquals(1, result[2].size) + + finish(1) + } +} \ No newline at end of file