From 6fb01b9f39d89c219841899ce357af220b08d461 Mon Sep 17 00:00:00 2001 From: Lukasz Wojtach Date: Tue, 10 Nov 2020 12:57:32 +0100 Subject: [PATCH 01/18] Add time- and size-based chunking operators --- .../common/src/flow/operators/Chunked.kt | 313 ++++++++++++++++++ .../common/test/flow/operators/ChunkedTest.kt | 231 +++++++++++++ 2 files changed, 544 insertions(+) create mode 100644 kotlinx-coroutines-core/common/src/flow/operators/Chunked.kt create mode 100644 kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Chunked.kt b/kotlinx-coroutines-core/common/src/flow/operators/Chunked.kt new file mode 100644 index 0000000000..5bc80ea9aa --- /dev/null +++ b/kotlinx-coroutines-core/common/src/flow/operators/Chunked.kt @@ -0,0 +1,313 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +@file:JvmMultifileClass +@file:JvmName("FlowKt") + +package kotlinx.coroutines.flow + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import kotlinx.coroutines.flow.internal.* +import kotlinx.coroutines.selects.* +import kotlin.jvm.* +import kotlin.math.* +import kotlin.time.* + +private const val NO_MAXIMUM = -1 + +public fun Flow.chunked(maxSize: Int, minSize: Int = 1): Flow> { + require(minSize in 0 until maxSize) + return flow { + val accumulator = mutableListOf() + collect { value -> + accumulator.add(value) + if (accumulator.size == maxSize) emit(accumulator.drain()) + } + if (accumulator.size >= minSize) emit(accumulator) + } +} + +@ExperimentalTime +public fun Flow.chunked( + chunkDuration: Duration, + minSize: Int = 1, + maxSize: Int = NO_MAXIMUM +): Flow> = chunked(chunkDuration.toDelayMillis(), minSize, maxSize) + +public fun Flow.chunked( + chunkDurationMs: Long, + minSize: Int = 1, + maxSize: Int = NO_MAXIMUM +): Flow> { + require(chunkDurationMs > 0) + require(minSize >= 0) + require(maxSize == NO_MAXIMUM || maxSize >= minSize) + + return if (minSize == 0 && maxSize == NO_MAXIMUM) chunkFixedTimeWindows(chunkDurationMs) + else if (minSize == 0) chunkContinousWindows(chunkDurationMs, maxSize) + else chunkFloatingWindows(chunkDurationMs, minSize, maxSize) +} + +public fun Flow.chunkFixedTimeWindows(durationMs: Long): Flow> = scopedFlow { downstream -> + val upstream = produce(capacity = Channel.CHANNEL_DEFAULT_CAPACITY) { + val ticker = Ticker(durationMs, this).apply { send(Ticker.Message.Start) } + launch { + for (tick in ticker) send(Signal.TimeIsUp) + } + collect { value -> send(Signal.NewElement(value)) } + ticker.close() + } + val accumulator = mutableListOf() + + for (signal in upstream) { + when (signal) { + is Signal.NewElement -> accumulator.add(signal.value) + is Signal.TimeIsUp -> downstream.emit(accumulator.drain()) + } + } + if (accumulator.isNotEmpty()) downstream.emit(accumulator) +} + +public fun Flow.chunkContinousWindows(durationMs: Long, maxSize: Int): Flow> = + scopedFlow { downstream -> + val inbox: ReceiveChannel = this@chunkContinousWindows.produceIn(this) + val ticker = Ticker(durationMs, this).apply { send(Ticker.Message.Start) } + val accumulator = mutableListOf() + + whileSelect { + inbox.onReceiveOrClosed.invoke { valueOrClosed -> + val isOpen = !valueOrClosed.isClosed + if (isOpen) { + accumulator.add(valueOrClosed.value) + if(accumulator.size == maxSize){ + ticker.send(Ticker.Message.Reset) + downstream.emit(accumulator.drain()) + ticker.send(Ticker.Message.Start) + } + } + isOpen + } + ticker.onReceive.invoke { + downstream.emit(accumulator.drain()) + true + } + } + + ticker.close() + if (accumulator.isNotEmpty()) downstream.emit(accumulator) + } + +public fun Flow.chunkFloatingWindows( + durationMs: Long, + minSize: Int, + maxSize: Int, +): Flow> { + + return scopedFlow { downstream -> + val upstream: ReceiveChannel = this@chunkFloatingWindows.produceIn(this) + val ticker = Ticker(durationMs, this) + val accumulator = mutableListOf() + + whileSelect { + upstream.onReceiveOrClosed.invoke { valueOrClosed -> + val isOpen = valueOrClosed.isClosed.not() + if (isOpen) { + if (accumulator.isEmpty()) ticker.send(Ticker.Message.Start) + accumulator.add(valueOrClosed.value) + if (accumulator.size == maxSize) { + ticker.send(Ticker.Message.Reset) + downstream.emit(accumulator.drain()) + } + } + isOpen + } + ticker.onReceive.invoke { + if (accumulator.size >= minSize) downstream.emit(accumulator.drain()) + true + } + } + + ticker.close() + if (accumulator.size >= minSize) downstream.emit(accumulator) + } +} + +private class Ticker( + private val intervalMs: Long, + scope: CoroutineScope, + private val inbox: Channel = Channel(), + private val ticks: Channel = Channel() +) : SendChannel by inbox, ReceiveChannel by ticks { + + init { + scope.processMessages() + } + + private fun CoroutineScope.processMessages() = launch { + var ticker = setupTicks() + for (message in inbox) { + when (message) { + Message.Start -> ticker.start() + Message.Reset -> { + ticker.cancel() + ticker = setupTicks() + } + } + } + ticker.cancel() + ticks.cancel() + } + + private fun CoroutineScope.setupTicks() = launch(start = CoroutineStart.LAZY) { + while (true) { + delay(intervalMs) + ticks.send(Unit) + } + } + + sealed class Message { + object Start : Message() + object Reset : Message() + } +} + +public fun Flow.chunkedNonResetableTimer( + durationMs: Long, + fixedIntervals: Boolean = false, + minSize: Int = 0, + maxSize: Int = NO_MAXIMUM, +): Flow> { + require(minSize >= 0) + require(maxSize == NO_MAXIMUM || maxSize >= minSize) + return scopedFlow { downstream -> + val chunker: Chunker = + if (fixedIntervals) EagerChunker(durationMs, minSize, maxSize) + else LazyChunker(durationMs, minSize, maxSize) + + launch { + collect { value -> chunker.send(value) } + chunker.close() + } + downstream.emitAll(chunker.chunks) + } +} + +private sealed class Signal { + class NewElement(val value: T) : Signal() + object TimeIsUp : Signal() +} + +private sealed class Message { + class NewElement(val value: T) : Message() + object TimeIsUp : Message() + object Stop : Message() +} + +private abstract class Chunker { + protected val inbox = Channel>(Channel.CHANNEL_DEFAULT_CAPACITY) + + abstract val chunks: Flow> + + suspend fun send(value: T) { + inbox.send(Message.NewElement(value)) + } + + suspend fun close() { + inbox.send(Message.Stop) + } +} + + +private class LazyChunker( + private val durationMs: Long, + private val minSize: Int, + private val maxSize: Int +) : Chunker() { + + override val chunks = flow { + coroutineScope { + val accumulator = ArrayList() + var timer: Job? = null + + for (message in inbox) { + when (message) { + is Message.NewElement -> { + if (accumulator.isEmpty()) timer = startTimer() + accumulator.add(message.value) + if (accumulator.size == maxSize) { + timer?.cancelAndJoin() + emit(accumulator.drain()) + } + } + + Message.TimeIsUp -> { + if (accumulator.size >= minSize) { + emit(accumulator.drain()) + } + } + + Message.Stop -> { + timer?.cancelAndJoin() + inbox.close() + } + } + } + + if (accumulator.size >= max(1, minSize)) emit(accumulator) + } + } + + private fun CoroutineScope.startTimer() = launch { + delay(durationMs) + inbox.send(Message.TimeIsUp) + } +} + +private class EagerChunker( + private val durationMs: Long, + private val minSize: Int, + private val maxSize: Int +) : Chunker() { + + override val chunks: Flow> = flow { + coroutineScope { + val accumulator = ArrayList() + val timer = startTimer() + + for (message in inbox) { + when (message) { + is Message.NewElement -> { + accumulator.add(message.value) + if (accumulator.size == maxSize) { + emit(accumulator.drain()) + } + } + + Message.TimeIsUp -> { + if (accumulator.size >= minSize) { + emit(accumulator.drain()) + } + } + + Message.Stop -> { + timer.cancelAndJoin() + inbox.close() + } + } + } + + if (accumulator.size >= max(1, minSize)) emit(accumulator) + } + } + + private fun CoroutineScope.startTimer() = launch { + while (true) { + delay(durationMs) + inbox.send(Message.TimeIsUp) + } + } +} + +private fun MutableList.drain() = toList().also { this.clear() } \ No newline at end of file diff --git a/kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt new file mode 100644 index 0000000000..f24c76ecd5 --- /dev/null +++ b/kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt @@ -0,0 +1,231 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow + +import kotlinx.coroutines.* +import kotlin.test.* +import kotlin.time.* + +@ExperimentalTime +class ChunkedTest : TestBase() { + + private val testFlow = flow { + delay(10) + for (i in 1..10_000_000) { + emit(i) +// delay(500) +// for (j in 1..100_000) { +// emit(j) +// } + } + } + + private fun Flow.channelTransform() = channelFlow { + val inbox = produceIn(this) + repeat(4) { + launch(Dispatchers.Default) { + for (value in inbox) send(value.toString().toInt() * 2) + } + } + } + + @Test + fun testLazy() = runTest { + launch(Dispatchers.Default) { + var emissionsCount = 0 + testFlow.chunkedNonResetableTimer(100, false) + .onEach { emissionsCount += it.size } + .count() + .let { println("chunks: $it, total emissions: $emissionsCount") } + } + } + + @Test + fun testEager() = runTest { + launch(Dispatchers.Default) { + var emissionsCount = 0 + testFlow.chunkedNonResetableTimer(100, true) + .onEach { emissionsCount += it.size } + .count() + .let { println("chunks: $it, total emissions: $emissionsCount") } + } + } + + @Test + fun testSelectFixedInterval() = runTest { + launch(Dispatchers.Default) { + var emissionsCount = 0 + testFlow.chunked(100.toDuration(DurationUnit.MILLISECONDS), minSize = 0) + .onEach { emissionsCount += it.size } + .count() + .let { println("chunks: $it, total emissions: $emissionsCount") } + } + } + + @Test + fun testSelectFloatingInterval() = runTest { + launch(Dispatchers.Default) { + var emissionsCount = 0 + testFlow.chunked(100.toDuration(DurationUnit.MILLISECONDS)) + .onEach { emissionsCount += it.size } + .count() + .let { println("chunks: $it, total emissions: $emissionsCount") } + } + } + + @Test + fun testChunkNoMaxTime() = runTest { + launch(Dispatchers.Default) { + var emissionsCount = 0 + testFlow.chunkFixedTimeWindows(100) + .onEach { emissionsCount += it.size } + .count() + .let { println("chunks: $it, total emissions: $emissionsCount") } + } + } + + @Test + fun testEmptyFlow() = runTest { + val emptyFlow = emptyFlow() + val result = emptyFlow.chunked(1000).toList() + + assertTrue { result.isEmpty() } + } + + @ExperimentalTime + @Test + fun testSingleFastElement() = runTest { + val fastFlow = flow { emit(1) } + val result = measureTimedValue { + fastFlow.chunked(10_000).toList() + } + + assertTrue { result.value.size == 1 && result.value.first().contains(1) } + assertTrue { result.duration.inSeconds < 1 } + } + + @Test + fun testWindowsChunkingWithNoMinimumSize() = withVirtualTime { + val intervalFlow = flow { + delay(1500) + emit(1) + delay(1500) + emit(2) + delay(1500) + emit(3) + } + val chunks = intervalFlow.chunked(2.toDuration(DurationUnit.SECONDS), minSize = 0).toList() + + assertEquals (3, chunks.size) + assertTrue { chunks.all { it.size == 1 } } + + finish(1) + } + + @Test + fun testNonContinousWindowsChunking() = withVirtualTime { + val intervalFlow = flow { + delay(1500) + emit(1) + delay(1500) + emit(2) + delay(1500) + emit(3) + } + val chunks = intervalFlow.chunked(2.toDuration(DurationUnit.SECONDS)).toList() + + assertEquals (2, chunks.size) + assertTrue { chunks.first().size == 2 && chunks[1].size == 1 } + + finish(1) + } + + @Test + fun testRespectingMinValue() = withVirtualTime { + val intervalFlow = flow { + delay(1500) + emit(1) + delay(1500) + emit(2) + delay(1500) + emit(3) + } + val chunks = intervalFlow.chunked(2000, minSize = 3).toList() + + assertTrue { chunks.size == 1 } + assertTrue { chunks.first().size == 3 } + + finish(1) + } + + @Test + fun testRespectingMaxValueWithContinousWindows() = withVirtualTime { + val intervalFlow = flow { + delay(1500) + emit(1) + emit(2) + emit(3) + emit(4) + delay(1500) + emit(5) + delay(1500) + emit(6) + } + val chunks = intervalFlow.chunked(2000, minSize = 0, maxSize = 3).toList() + + assertEquals(3, chunks.size) + assertEquals(3, chunks.first().size) + assertEquals(2, chunks[1].size) + assertTrue { chunks[1].containsAll(listOf(4, 5)) } + + finish(1) + } + + @Test + fun testRespectingMaxValueAndResetingTickerWithNonContinousWindows() = withVirtualTime { + val intervalFlow = flow { + delay(1500) + emit(1) + emit(2) + emit(3) + delay(1500) + emit(4) + emit(5) + delay(1500) + emit(6) + } + val chunks = intervalFlow.chunked(2000, maxSize = 3).toList() + + assertEquals(2, chunks.size) + assertEquals(3, chunks.first().size) + assertEquals(3, chunks[1].size) + assertTrue { chunks[1].containsAll(listOf(4, 5, 6)) } + + finish(1) + } + + @Test + fun testSizeBasedChunking() = runTest { + val flow = flow { + for (i in 1..10) emit(i) + } + + val chunks = flow.chunked(maxSize = 3).toList() + + assertEquals(4, chunks.size) + } + + @Test + fun testSizeBasedChunkingWithMinSize() = runTest { + val flow = flow { + for (i in 1..10) emit(i) + } + + val chunks = flow.chunked(maxSize = 3, minSize = 2).toList() + + assertEquals(3, chunks.size) + } + +} \ No newline at end of file From 43bfcfbf862706e6407876590b891d82d4e8b7da Mon Sep 17 00:00:00 2001 From: Lukasz Wojtach Date: Tue, 10 Nov 2020 13:22:31 +0100 Subject: [PATCH 02/18] Remove unused operators --- .../common/src/flow/operators/Chunked.kt | 132 ------------------ 1 file changed, 132 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Chunked.kt b/kotlinx-coroutines-core/common/src/flow/operators/Chunked.kt index 5bc80ea9aa..92519602b9 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Chunked.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Chunked.kt @@ -173,141 +173,9 @@ private class Ticker( } } -public fun Flow.chunkedNonResetableTimer( - durationMs: Long, - fixedIntervals: Boolean = false, - minSize: Int = 0, - maxSize: Int = NO_MAXIMUM, -): Flow> { - require(minSize >= 0) - require(maxSize == NO_MAXIMUM || maxSize >= minSize) - return scopedFlow { downstream -> - val chunker: Chunker = - if (fixedIntervals) EagerChunker(durationMs, minSize, maxSize) - else LazyChunker(durationMs, minSize, maxSize) - - launch { - collect { value -> chunker.send(value) } - chunker.close() - } - downstream.emitAll(chunker.chunks) - } -} - private sealed class Signal { class NewElement(val value: T) : Signal() object TimeIsUp : Signal() } -private sealed class Message { - class NewElement(val value: T) : Message() - object TimeIsUp : Message() - object Stop : Message() -} - -private abstract class Chunker { - protected val inbox = Channel>(Channel.CHANNEL_DEFAULT_CAPACITY) - - abstract val chunks: Flow> - - suspend fun send(value: T) { - inbox.send(Message.NewElement(value)) - } - - suspend fun close() { - inbox.send(Message.Stop) - } -} - - -private class LazyChunker( - private val durationMs: Long, - private val minSize: Int, - private val maxSize: Int -) : Chunker() { - - override val chunks = flow { - coroutineScope { - val accumulator = ArrayList() - var timer: Job? = null - - for (message in inbox) { - when (message) { - is Message.NewElement -> { - if (accumulator.isEmpty()) timer = startTimer() - accumulator.add(message.value) - if (accumulator.size == maxSize) { - timer?.cancelAndJoin() - emit(accumulator.drain()) - } - } - - Message.TimeIsUp -> { - if (accumulator.size >= minSize) { - emit(accumulator.drain()) - } - } - - Message.Stop -> { - timer?.cancelAndJoin() - inbox.close() - } - } - } - - if (accumulator.size >= max(1, minSize)) emit(accumulator) - } - } - - private fun CoroutineScope.startTimer() = launch { - delay(durationMs) - inbox.send(Message.TimeIsUp) - } -} - -private class EagerChunker( - private val durationMs: Long, - private val minSize: Int, - private val maxSize: Int -) : Chunker() { - - override val chunks: Flow> = flow { - coroutineScope { - val accumulator = ArrayList() - val timer = startTimer() - - for (message in inbox) { - when (message) { - is Message.NewElement -> { - accumulator.add(message.value) - if (accumulator.size == maxSize) { - emit(accumulator.drain()) - } - } - - Message.TimeIsUp -> { - if (accumulator.size >= minSize) { - emit(accumulator.drain()) - } - } - - Message.Stop -> { - timer.cancelAndJoin() - inbox.close() - } - } - } - - if (accumulator.size >= max(1, minSize)) emit(accumulator) - } - } - - private fun CoroutineScope.startTimer() = launch { - while (true) { - delay(durationMs) - inbox.send(Message.TimeIsUp) - } - } -} - private fun MutableList.drain() = toList().also { this.clear() } \ No newline at end of file From c378678663d260dde3d9f0b3695f60b35ef8b670 Mon Sep 17 00:00:00 2001 From: Lukasz Wojtach Date: Tue, 10 Nov 2020 13:37:05 +0100 Subject: [PATCH 03/18] Add visibility modifiers and clarify tests --- .../common/src/flow/operators/Chunked.kt | 6 +- .../common/test/flow/operators/ChunkedTest.kt | 110 +++++------------- 2 files changed, 30 insertions(+), 86 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Chunked.kt b/kotlinx-coroutines-core/common/src/flow/operators/Chunked.kt index 92519602b9..aad650a91e 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Chunked.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Chunked.kt @@ -50,7 +50,7 @@ public fun Flow.chunked( else chunkFloatingWindows(chunkDurationMs, minSize, maxSize) } -public fun Flow.chunkFixedTimeWindows(durationMs: Long): Flow> = scopedFlow { downstream -> +private fun Flow.chunkFixedTimeWindows(durationMs: Long): Flow> = scopedFlow { downstream -> val upstream = produce(capacity = Channel.CHANNEL_DEFAULT_CAPACITY) { val ticker = Ticker(durationMs, this).apply { send(Ticker.Message.Start) } launch { @@ -70,7 +70,7 @@ public fun Flow.chunkFixedTimeWindows(durationMs: Long): Flow> = if (accumulator.isNotEmpty()) downstream.emit(accumulator) } -public fun Flow.chunkContinousWindows(durationMs: Long, maxSize: Int): Flow> = +private fun Flow.chunkContinousWindows(durationMs: Long, maxSize: Int): Flow> = scopedFlow { downstream -> val inbox: ReceiveChannel = this@chunkContinousWindows.produceIn(this) val ticker = Ticker(durationMs, this).apply { send(Ticker.Message.Start) } @@ -99,7 +99,7 @@ public fun Flow.chunkContinousWindows(durationMs: Long, maxSize: Int): Fl if (accumulator.isNotEmpty()) downstream.emit(accumulator) } -public fun Flow.chunkFloatingWindows( +private fun Flow.chunkFloatingWindows( durationMs: Long, minSize: Int, maxSize: Int, diff --git a/kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt index f24c76ecd5..df7dd66f0f 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt @@ -11,103 +11,47 @@ import kotlin.time.* @ExperimentalTime class ChunkedTest : TestBase() { - private val testFlow = flow { - delay(10) - for (i in 1..10_000_000) { - emit(i) -// delay(500) -// for (j in 1..100_000) { -// emit(j) -// } - } - } - - private fun Flow.channelTransform() = channelFlow { - val inbox = produceIn(this) - repeat(4) { - launch(Dispatchers.Default) { - for (value in inbox) send(value.toString().toInt() * 2) - } - } - } - - @Test - fun testLazy() = runTest { - launch(Dispatchers.Default) { - var emissionsCount = 0 - testFlow.chunkedNonResetableTimer(100, false) - .onEach { emissionsCount += it.size } - .count() - .let { println("chunks: $it, total emissions: $emissionsCount") } - } - } - @Test - fun testEager() = runTest { - launch(Dispatchers.Default) { - var emissionsCount = 0 - testFlow.chunkedNonResetableTimer(100, true) - .onEach { emissionsCount += it.size } - .count() - .let { println("chunks: $it, total emissions: $emissionsCount") } + fun testEmptyFlowChunking() = runTest { + val emptyFlow = emptyFlow() + val result = measureTimedValue { + emptyFlow.chunked(10.seconds).toList() } - } - @Test - fun testSelectFixedInterval() = runTest { - launch(Dispatchers.Default) { - var emissionsCount = 0 - testFlow.chunked(100.toDuration(DurationUnit.MILLISECONDS), minSize = 0) - .onEach { emissionsCount += it.size } - .count() - .let { println("chunks: $it, total emissions: $emissionsCount") } - } + assertTrue { result.value.isEmpty() } + assertTrue { result.duration.inSeconds < 1 } } + @ExperimentalTime @Test - fun testSelectFloatingInterval() = runTest { - launch(Dispatchers.Default) { - var emissionsCount = 0 - testFlow.chunked(100.toDuration(DurationUnit.MILLISECONDS)) - .onEach { emissionsCount += it.size } - .count() - .let { println("chunks: $it, total emissions: $emissionsCount") } - } - } + fun testSingleFastElementChunking() = runTest { + val fastFlow = flow { emit(1) } - @Test - fun testChunkNoMaxTime() = runTest { - launch(Dispatchers.Default) { - var emissionsCount = 0 - testFlow.chunkFixedTimeWindows(100) - .onEach { emissionsCount += it.size } - .count() - .let { println("chunks: $it, total emissions: $emissionsCount") } + val result = measureTimedValue { + fastFlow.chunked(10.seconds).toList() } - } - - @Test - fun testEmptyFlow() = runTest { - val emptyFlow = emptyFlow() - val result = emptyFlow.chunked(1000).toList() - assertTrue { result.isEmpty() } + assertTrue { result.value.size == 1 && result.value.first().contains(1) } + assertTrue { result.duration.inSeconds < 1 } } @ExperimentalTime @Test - fun testSingleFastElement() = runTest { - val fastFlow = flow { emit(1) } + fun testMultipleFastElementsChunking() = runTest { + val fastFlow = flow { + for(i in 1..1000) emit(1) + } + val result = measureTimedValue { - fastFlow.chunked(10_000).toList() + fastFlow.chunked(10.seconds).toList() } - assertTrue { result.value.size == 1 && result.value.first().contains(1) } + assertTrue { result.value.size == 1 && result.value.first().size == 1000 } assertTrue { result.duration.inSeconds < 1 } } @Test - fun testWindowsChunkingWithNoMinimumSize() = withVirtualTime { + fun testFixedTimeWindowChunkingWithZeroMinimumSize() = withVirtualTime { val intervalFlow = flow { delay(1500) emit(1) @@ -116,7 +60,7 @@ class ChunkedTest : TestBase() { delay(1500) emit(3) } - val chunks = intervalFlow.chunked(2.toDuration(DurationUnit.SECONDS), minSize = 0).toList() + val chunks = intervalFlow.chunked(2.seconds, minSize = 0).toList() assertEquals (3, chunks.size) assertTrue { chunks.all { it.size == 1 } } @@ -125,7 +69,7 @@ class ChunkedTest : TestBase() { } @Test - fun testNonContinousWindowsChunking() = withVirtualTime { + fun testDefaultChunkingWithFloatingWindows() = withVirtualTime { val intervalFlow = flow { delay(1500) emit(1) @@ -134,7 +78,7 @@ class ChunkedTest : TestBase() { delay(1500) emit(3) } - val chunks = intervalFlow.chunked(2.toDuration(DurationUnit.SECONDS)).toList() + val chunks = intervalFlow.chunked(2.seconds).toList() assertEquals (2, chunks.size) assertTrue { chunks.first().size == 2 && chunks[1].size == 1 } @@ -152,7 +96,7 @@ class ChunkedTest : TestBase() { delay(1500) emit(3) } - val chunks = intervalFlow.chunked(2000, minSize = 3).toList() + val chunks = intervalFlow.chunked(2.seconds, minSize = 3).toList() assertTrue { chunks.size == 1 } assertTrue { chunks.first().size == 3 } @@ -173,7 +117,7 @@ class ChunkedTest : TestBase() { delay(1500) emit(6) } - val chunks = intervalFlow.chunked(2000, minSize = 0, maxSize = 3).toList() + val chunks = intervalFlow.chunked(2.seconds, minSize = 0, maxSize = 3).toList() assertEquals(3, chunks.size) assertEquals(3, chunks.first().size) @@ -196,7 +140,7 @@ class ChunkedTest : TestBase() { delay(1500) emit(6) } - val chunks = intervalFlow.chunked(2000, maxSize = 3).toList() + val chunks = intervalFlow.chunked(2.seconds, maxSize = 3).toList() assertEquals(2, chunks.size) assertEquals(3, chunks.first().size) From 5237f92d9c0e64744b598c5a355b92d1589517f6 Mon Sep 17 00:00:00 2001 From: Lukasz Wojtach Date: Wed, 23 Dec 2020 15:09:23 +0100 Subject: [PATCH 04/18] Chunk with interval and size only --- .../common/src/flow/operators/Chunked.kt | 191 +++++------------- 1 file changed, 52 insertions(+), 139 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Chunked.kt b/kotlinx-coroutines-core/common/src/flow/operators/Chunked.kt index aad650a91e..482a29c7b4 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Chunked.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Chunked.kt @@ -12,170 +12,83 @@ import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.internal.* import kotlinx.coroutines.selects.* import kotlin.jvm.* -import kotlin.math.* import kotlin.time.* -private const val NO_MAXIMUM = -1 - -public fun Flow.chunked(maxSize: Int, minSize: Int = 1): Flow> { - require(minSize in 0 until maxSize) - return flow { - val accumulator = mutableListOf() - collect { value -> - accumulator.add(value) - if (accumulator.size == maxSize) emit(accumulator.drain()) - } - if (accumulator.size >= minSize) emit(accumulator) - } +public object ChunkConstraint { + public const val NO_MAXIMUM: Int = Int.MAX_VALUE + public const val NO_INTERVAL: Long = Long.MAX_VALUE } @ExperimentalTime public fun Flow.chunked( - chunkDuration: Duration, - minSize: Int = 1, - maxSize: Int = NO_MAXIMUM -): Flow> = chunked(chunkDuration.toDelayMillis(), minSize, maxSize) + interval: Duration, + size: Int +): Flow> = chunked(interval.toLongMilliseconds(), size) public fun Flow.chunked( - chunkDurationMs: Long, - minSize: Int = 1, - maxSize: Int = NO_MAXIMUM + intervalMs: Long, + size: Int ): Flow> { - require(chunkDurationMs > 0) - require(minSize >= 0) - require(maxSize == NO_MAXIMUM || maxSize >= minSize) - - return if (minSize == 0 && maxSize == NO_MAXIMUM) chunkFixedTimeWindows(chunkDurationMs) - else if (minSize == 0) chunkContinousWindows(chunkDurationMs, maxSize) - else chunkFloatingWindows(chunkDurationMs, minSize, maxSize) -} - -private fun Flow.chunkFixedTimeWindows(durationMs: Long): Flow> = scopedFlow { downstream -> - val upstream = produce(capacity = Channel.CHANNEL_DEFAULT_CAPACITY) { - val ticker = Ticker(durationMs, this).apply { send(Ticker.Message.Start) } - launch { - for (tick in ticker) send(Signal.TimeIsUp) - } - collect { value -> send(Signal.NewElement(value)) } - ticker.close() - } - val accumulator = mutableListOf() + require(intervalMs >= 0) + require(size > 0) - for (signal in upstream) { - when (signal) { - is Signal.NewElement -> accumulator.add(signal.value) - is Signal.TimeIsUp -> downstream.emit(accumulator.drain()) - } - } - if (accumulator.isNotEmpty()) downstream.emit(accumulator) + return if(intervalMs != ChunkConstraint.NO_INTERVAL) chunkedTimeBased(intervalMs, size) + else chunkedSizeBased(size) } -private fun Flow.chunkContinousWindows(durationMs: Long, maxSize: Int): Flow> = - scopedFlow { downstream -> - val inbox: ReceiveChannel = this@chunkContinousWindows.produceIn(this) - val ticker = Ticker(durationMs, this).apply { send(Ticker.Message.Start) } - val accumulator = mutableListOf() +private fun Flow.chunkedTimeBased(intervalMs: Long, size: Int): Flow> = scopedFlow { downstream -> + val buffer = Channel(size) + val emitSemaphore = Channel() + val collectSemaphore = Channel() - whileSelect { - inbox.onReceiveOrClosed.invoke { valueOrClosed -> - val isOpen = !valueOrClosed.isClosed - if (isOpen) { - accumulator.add(valueOrClosed.value) - if(accumulator.size == maxSize){ - ticker.send(Ticker.Message.Reset) - downstream.emit(accumulator.drain()) - ticker.send(Ticker.Message.Start) - } - } - isOpen - } - ticker.onReceive.invoke { - downstream.emit(accumulator.drain()) - true + launch { + collect { value -> + val hasCapacity = buffer.offer(value) + if (!hasCapacity) { + emitSemaphore.send(Unit) + collectSemaphore.receive() + buffer.send(value) } } - - ticker.close() - if (accumulator.isNotEmpty()) downstream.emit(accumulator) + emitSemaphore.close() + buffer.close() } -private fun Flow.chunkFloatingWindows( - durationMs: Long, - minSize: Int, - maxSize: Int, -): Flow> { - - return scopedFlow { downstream -> - val upstream: ReceiveChannel = this@chunkFloatingWindows.produceIn(this) - val ticker = Ticker(durationMs, this) - val accumulator = mutableListOf() - - whileSelect { - upstream.onReceiveOrClosed.invoke { valueOrClosed -> - val isOpen = valueOrClosed.isClosed.not() - if (isOpen) { - if (accumulator.isEmpty()) ticker.send(Ticker.Message.Start) - accumulator.add(valueOrClosed.value) - if (accumulator.size == maxSize) { - ticker.send(Ticker.Message.Reset) - downstream.emit(accumulator.drain()) - } - } - isOpen - } - ticker.onReceive.invoke { - if (accumulator.size >= minSize) downstream.emit(accumulator.drain()) - true - } + whileSelect { + emitSemaphore.onReceiveOrClosed { valueOrClosed -> + buffer.drain().takeIf { it.isNotEmpty() }?.let { downstream.emit(it) } + val shouldCollectNextChunk = valueOrClosed.isClosed.not() + if (shouldCollectNextChunk) collectSemaphore.send(Unit) else collectSemaphore.close() + shouldCollectNextChunk } - - ticker.close() - if (accumulator.size >= minSize) downstream.emit(accumulator) - } -} - -private class Ticker( - private val intervalMs: Long, - scope: CoroutineScope, - private val inbox: Channel = Channel(), - private val ticks: Channel = Channel() -) : SendChannel by inbox, ReceiveChannel by ticks { - - init { - scope.processMessages() - } - - private fun CoroutineScope.processMessages() = launch { - var ticker = setupTicks() - for (message in inbox) { - when (message) { - Message.Start -> ticker.start() - Message.Reset -> { - ticker.cancel() - ticker = setupTicks() - } - } + onTimeout(intervalMs) { + downstream.emit(buffer.awaitFirstAndDrain()) + true } - ticker.cancel() - ticks.cancel() } +} - private fun CoroutineScope.setupTicks() = launch(start = CoroutineStart.LAZY) { - while (true) { - delay(intervalMs) - ticks.send(Unit) - } - } +private suspend fun ReceiveChannel.awaitFirstAndDrain(): List { + val first = receiveOrClosed().takeIf { it.isClosed.not() }?.value ?: return emptyList() + return drain(mutableListOf(first)) +} - sealed class Message { - object Start : Message() - object Reset : Message() +private tailrec fun ReceiveChannel.drain(acc: MutableList = mutableListOf()): List { + val item = poll() + return if (item == null) acc + else { + acc.add(item) + drain(acc) } } -private sealed class Signal { - class NewElement(val value: T) : Signal() - object TimeIsUp : Signal() +private fun Flow.chunkedSizeBased(size: Int): Flow> = flow { + val buffer = mutableListOf() + collect { value -> + buffer.add(value) + if(buffer.size == size) emit(buffer.drain()) + } + if(buffer.isNotEmpty()) emit(buffer) } private fun MutableList.drain() = toList().also { this.clear() } \ No newline at end of file From 8b8b28e94e516cfc9b2eb3bd74d96e27723774bc Mon Sep 17 00:00:00 2001 From: Lukasz Wojtach Date: Fri, 8 Jan 2021 11:33:16 +0100 Subject: [PATCH 05/18] Chunk with interval and size only - part 2 --- .../common/src/flow/operators/Chunked.kt | 36 +++--- .../common/test/flow/operators/ChunkedTest.kt | 109 +----------------- 2 files changed, 21 insertions(+), 124 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Chunked.kt b/kotlinx-coroutines-core/common/src/flow/operators/Chunked.kt index 482a29c7b4..f94dedfb63 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Chunked.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Chunked.kt @@ -17,6 +17,7 @@ import kotlin.time.* public object ChunkConstraint { public const val NO_MAXIMUM: Int = Int.MAX_VALUE public const val NO_INTERVAL: Long = Long.MAX_VALUE + public const val NATURAL_BATCHING: Long = 0 } @ExperimentalTime @@ -32,53 +33,48 @@ public fun Flow.chunked( require(intervalMs >= 0) require(size > 0) - return if(intervalMs != ChunkConstraint.NO_INTERVAL) chunkedTimeBased(intervalMs, size) - else chunkedSizeBased(size) + return chunkedTimeBased(intervalMs, size) } private fun Flow.chunkedTimeBased(intervalMs: Long, size: Int): Flow> = scopedFlow { downstream -> val buffer = Channel(size) - val emitSemaphore = Channel() - val collectSemaphore = Channel() + val emitNowSemaphore = Channel() launch { collect { value -> val hasCapacity = buffer.offer(value) if (!hasCapacity) { - emitSemaphore.send(Unit) - collectSemaphore.receive() + emitNowSemaphore.send(Unit) buffer.send(value) } } - emitSemaphore.close() + emitNowSemaphore.close() buffer.close() } whileSelect { - emitSemaphore.onReceiveOrClosed { valueOrClosed -> - buffer.drain().takeIf { it.isNotEmpty() }?.let { downstream.emit(it) } - val shouldCollectNextChunk = valueOrClosed.isClosed.not() - if (shouldCollectNextChunk) collectSemaphore.send(Unit) else collectSemaphore.close() - shouldCollectNextChunk + emitNowSemaphore.onReceiveOrClosed { valueOrClosed -> + buffer.drain(maxElements = size).takeIf { it.isNotEmpty() }?.let { downstream.emit(it) } + valueOrClosed.isClosed.not() } onTimeout(intervalMs) { - downstream.emit(buffer.awaitFirstAndDrain()) + downstream.emit(buffer.awaitFirstAndDrain(maxElements = size)) true } } } -private suspend fun ReceiveChannel.awaitFirstAndDrain(): List { +private suspend fun ReceiveChannel.awaitFirstAndDrain(maxElements: Int): List { val first = receiveOrClosed().takeIf { it.isClosed.not() }?.value ?: return emptyList() - return drain(mutableListOf(first)) + return drain(mutableListOf(first), maxElements) } -private tailrec fun ReceiveChannel.drain(acc: MutableList = mutableListOf()): List { +private tailrec fun ReceiveChannel.drain(acc: MutableList = mutableListOf(), maxElements: Int): List { val item = poll() - return if (item == null) acc + return if (item == null || acc.size == maxElements) acc else { acc.add(item) - drain(acc) + drain(acc, maxElements) } } @@ -86,9 +82,9 @@ private fun Flow.chunkedSizeBased(size: Int): Flow> = flow { val buffer = mutableListOf() collect { value -> buffer.add(value) - if(buffer.size == size) emit(buffer.drain()) + if (buffer.size == size) emit(buffer.drain()) } - if(buffer.isNotEmpty()) emit(buffer) + if (buffer.isNotEmpty()) emit(buffer) } private fun MutableList.drain() = toList().also { this.clear() } \ No newline at end of file diff --git a/kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt index df7dd66f0f..5ffe767268 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt @@ -15,7 +15,7 @@ class ChunkedTest : TestBase() { fun testEmptyFlowChunking() = runTest { val emptyFlow = emptyFlow() val result = measureTimedValue { - emptyFlow.chunked(10.seconds).toList() + emptyFlow.chunked(10.seconds, ChunkConstraint.NO_MAXIMUM).toList() } assertTrue { result.value.isEmpty() } @@ -28,7 +28,7 @@ class ChunkedTest : TestBase() { val fastFlow = flow { emit(1) } val result = measureTimedValue { - fastFlow.chunked(10.seconds).toList() + fastFlow.chunked(10.seconds, ChunkConstraint.NO_MAXIMUM).toList() } assertTrue { result.value.size == 1 && result.value.first().contains(1) } @@ -43,7 +43,7 @@ class ChunkedTest : TestBase() { } val result = measureTimedValue { - fastFlow.chunked(10.seconds).toList() + fastFlow.chunked(10.seconds, ChunkConstraint.NO_MAXIMUM).toList() } assertTrue { result.value.size == 1 && result.value.first().size == 1000 } @@ -51,61 +51,7 @@ class ChunkedTest : TestBase() { } @Test - fun testFixedTimeWindowChunkingWithZeroMinimumSize() = withVirtualTime { - val intervalFlow = flow { - delay(1500) - emit(1) - delay(1500) - emit(2) - delay(1500) - emit(3) - } - val chunks = intervalFlow.chunked(2.seconds, minSize = 0).toList() - - assertEquals (3, chunks.size) - assertTrue { chunks.all { it.size == 1 } } - - finish(1) - } - - @Test - fun testDefaultChunkingWithFloatingWindows() = withVirtualTime { - val intervalFlow = flow { - delay(1500) - emit(1) - delay(1500) - emit(2) - delay(1500) - emit(3) - } - val chunks = intervalFlow.chunked(2.seconds).toList() - - assertEquals (2, chunks.size) - assertTrue { chunks.first().size == 2 && chunks[1].size == 1 } - - finish(1) - } - - @Test - fun testRespectingMinValue() = withVirtualTime { - val intervalFlow = flow { - delay(1500) - emit(1) - delay(1500) - emit(2) - delay(1500) - emit(3) - } - val chunks = intervalFlow.chunked(2.seconds, minSize = 3).toList() - - assertTrue { chunks.size == 1 } - assertTrue { chunks.first().size == 3 } - - finish(1) - } - - @Test - fun testRespectingMaxValueWithContinousWindows() = withVirtualTime { + fun testRespectingSizeAndTimeLimit() = withVirtualTime { val intervalFlow = flow { delay(1500) emit(1) @@ -117,7 +63,7 @@ class ChunkedTest : TestBase() { delay(1500) emit(6) } - val chunks = intervalFlow.chunked(2.seconds, minSize = 0, maxSize = 3).toList() + val chunks = intervalFlow.chunked(2.seconds, size = 3).toList() assertEquals(3, chunks.size) assertEquals(3, chunks.first().size) @@ -127,49 +73,4 @@ class ChunkedTest : TestBase() { finish(1) } - @Test - fun testRespectingMaxValueAndResetingTickerWithNonContinousWindows() = withVirtualTime { - val intervalFlow = flow { - delay(1500) - emit(1) - emit(2) - emit(3) - delay(1500) - emit(4) - emit(5) - delay(1500) - emit(6) - } - val chunks = intervalFlow.chunked(2.seconds, maxSize = 3).toList() - - assertEquals(2, chunks.size) - assertEquals(3, chunks.first().size) - assertEquals(3, chunks[1].size) - assertTrue { chunks[1].containsAll(listOf(4, 5, 6)) } - - finish(1) - } - - @Test - fun testSizeBasedChunking() = runTest { - val flow = flow { - for (i in 1..10) emit(i) - } - - val chunks = flow.chunked(maxSize = 3).toList() - - assertEquals(4, chunks.size) - } - - @Test - fun testSizeBasedChunkingWithMinSize() = runTest { - val flow = flow { - for (i in 1..10) emit(i) - } - - val chunks = flow.chunked(maxSize = 3, minSize = 2).toList() - - assertEquals(3, chunks.size) - } - } \ No newline at end of file From 5c5c088a05bb41518fed630da948e5dbff1fb8b8 Mon Sep 17 00:00:00 2001 From: Lukasz Wojtach Date: Tue, 10 Nov 2020 12:57:32 +0100 Subject: [PATCH 06/18] Add time- and size-based chunking operators --- .../common/src/flow/operators/Chunked.kt | 313 ++++++++++++++++++ .../common/test/flow/operators/ChunkedTest.kt | 231 +++++++++++++ 2 files changed, 544 insertions(+) create mode 100644 kotlinx-coroutines-core/common/src/flow/operators/Chunked.kt create mode 100644 kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Chunked.kt b/kotlinx-coroutines-core/common/src/flow/operators/Chunked.kt new file mode 100644 index 0000000000..5bc80ea9aa --- /dev/null +++ b/kotlinx-coroutines-core/common/src/flow/operators/Chunked.kt @@ -0,0 +1,313 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +@file:JvmMultifileClass +@file:JvmName("FlowKt") + +package kotlinx.coroutines.flow + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import kotlinx.coroutines.flow.internal.* +import kotlinx.coroutines.selects.* +import kotlin.jvm.* +import kotlin.math.* +import kotlin.time.* + +private const val NO_MAXIMUM = -1 + +public fun Flow.chunked(maxSize: Int, minSize: Int = 1): Flow> { + require(minSize in 0 until maxSize) + return flow { + val accumulator = mutableListOf() + collect { value -> + accumulator.add(value) + if (accumulator.size == maxSize) emit(accumulator.drain()) + } + if (accumulator.size >= minSize) emit(accumulator) + } +} + +@ExperimentalTime +public fun Flow.chunked( + chunkDuration: Duration, + minSize: Int = 1, + maxSize: Int = NO_MAXIMUM +): Flow> = chunked(chunkDuration.toDelayMillis(), minSize, maxSize) + +public fun Flow.chunked( + chunkDurationMs: Long, + minSize: Int = 1, + maxSize: Int = NO_MAXIMUM +): Flow> { + require(chunkDurationMs > 0) + require(minSize >= 0) + require(maxSize == NO_MAXIMUM || maxSize >= minSize) + + return if (minSize == 0 && maxSize == NO_MAXIMUM) chunkFixedTimeWindows(chunkDurationMs) + else if (minSize == 0) chunkContinousWindows(chunkDurationMs, maxSize) + else chunkFloatingWindows(chunkDurationMs, minSize, maxSize) +} + +public fun Flow.chunkFixedTimeWindows(durationMs: Long): Flow> = scopedFlow { downstream -> + val upstream = produce(capacity = Channel.CHANNEL_DEFAULT_CAPACITY) { + val ticker = Ticker(durationMs, this).apply { send(Ticker.Message.Start) } + launch { + for (tick in ticker) send(Signal.TimeIsUp) + } + collect { value -> send(Signal.NewElement(value)) } + ticker.close() + } + val accumulator = mutableListOf() + + for (signal in upstream) { + when (signal) { + is Signal.NewElement -> accumulator.add(signal.value) + is Signal.TimeIsUp -> downstream.emit(accumulator.drain()) + } + } + if (accumulator.isNotEmpty()) downstream.emit(accumulator) +} + +public fun Flow.chunkContinousWindows(durationMs: Long, maxSize: Int): Flow> = + scopedFlow { downstream -> + val inbox: ReceiveChannel = this@chunkContinousWindows.produceIn(this) + val ticker = Ticker(durationMs, this).apply { send(Ticker.Message.Start) } + val accumulator = mutableListOf() + + whileSelect { + inbox.onReceiveOrClosed.invoke { valueOrClosed -> + val isOpen = !valueOrClosed.isClosed + if (isOpen) { + accumulator.add(valueOrClosed.value) + if(accumulator.size == maxSize){ + ticker.send(Ticker.Message.Reset) + downstream.emit(accumulator.drain()) + ticker.send(Ticker.Message.Start) + } + } + isOpen + } + ticker.onReceive.invoke { + downstream.emit(accumulator.drain()) + true + } + } + + ticker.close() + if (accumulator.isNotEmpty()) downstream.emit(accumulator) + } + +public fun Flow.chunkFloatingWindows( + durationMs: Long, + minSize: Int, + maxSize: Int, +): Flow> { + + return scopedFlow { downstream -> + val upstream: ReceiveChannel = this@chunkFloatingWindows.produceIn(this) + val ticker = Ticker(durationMs, this) + val accumulator = mutableListOf() + + whileSelect { + upstream.onReceiveOrClosed.invoke { valueOrClosed -> + val isOpen = valueOrClosed.isClosed.not() + if (isOpen) { + if (accumulator.isEmpty()) ticker.send(Ticker.Message.Start) + accumulator.add(valueOrClosed.value) + if (accumulator.size == maxSize) { + ticker.send(Ticker.Message.Reset) + downstream.emit(accumulator.drain()) + } + } + isOpen + } + ticker.onReceive.invoke { + if (accumulator.size >= minSize) downstream.emit(accumulator.drain()) + true + } + } + + ticker.close() + if (accumulator.size >= minSize) downstream.emit(accumulator) + } +} + +private class Ticker( + private val intervalMs: Long, + scope: CoroutineScope, + private val inbox: Channel = Channel(), + private val ticks: Channel = Channel() +) : SendChannel by inbox, ReceiveChannel by ticks { + + init { + scope.processMessages() + } + + private fun CoroutineScope.processMessages() = launch { + var ticker = setupTicks() + for (message in inbox) { + when (message) { + Message.Start -> ticker.start() + Message.Reset -> { + ticker.cancel() + ticker = setupTicks() + } + } + } + ticker.cancel() + ticks.cancel() + } + + private fun CoroutineScope.setupTicks() = launch(start = CoroutineStart.LAZY) { + while (true) { + delay(intervalMs) + ticks.send(Unit) + } + } + + sealed class Message { + object Start : Message() + object Reset : Message() + } +} + +public fun Flow.chunkedNonResetableTimer( + durationMs: Long, + fixedIntervals: Boolean = false, + minSize: Int = 0, + maxSize: Int = NO_MAXIMUM, +): Flow> { + require(minSize >= 0) + require(maxSize == NO_MAXIMUM || maxSize >= minSize) + return scopedFlow { downstream -> + val chunker: Chunker = + if (fixedIntervals) EagerChunker(durationMs, minSize, maxSize) + else LazyChunker(durationMs, minSize, maxSize) + + launch { + collect { value -> chunker.send(value) } + chunker.close() + } + downstream.emitAll(chunker.chunks) + } +} + +private sealed class Signal { + class NewElement(val value: T) : Signal() + object TimeIsUp : Signal() +} + +private sealed class Message { + class NewElement(val value: T) : Message() + object TimeIsUp : Message() + object Stop : Message() +} + +private abstract class Chunker { + protected val inbox = Channel>(Channel.CHANNEL_DEFAULT_CAPACITY) + + abstract val chunks: Flow> + + suspend fun send(value: T) { + inbox.send(Message.NewElement(value)) + } + + suspend fun close() { + inbox.send(Message.Stop) + } +} + + +private class LazyChunker( + private val durationMs: Long, + private val minSize: Int, + private val maxSize: Int +) : Chunker() { + + override val chunks = flow { + coroutineScope { + val accumulator = ArrayList() + var timer: Job? = null + + for (message in inbox) { + when (message) { + is Message.NewElement -> { + if (accumulator.isEmpty()) timer = startTimer() + accumulator.add(message.value) + if (accumulator.size == maxSize) { + timer?.cancelAndJoin() + emit(accumulator.drain()) + } + } + + Message.TimeIsUp -> { + if (accumulator.size >= minSize) { + emit(accumulator.drain()) + } + } + + Message.Stop -> { + timer?.cancelAndJoin() + inbox.close() + } + } + } + + if (accumulator.size >= max(1, minSize)) emit(accumulator) + } + } + + private fun CoroutineScope.startTimer() = launch { + delay(durationMs) + inbox.send(Message.TimeIsUp) + } +} + +private class EagerChunker( + private val durationMs: Long, + private val minSize: Int, + private val maxSize: Int +) : Chunker() { + + override val chunks: Flow> = flow { + coroutineScope { + val accumulator = ArrayList() + val timer = startTimer() + + for (message in inbox) { + when (message) { + is Message.NewElement -> { + accumulator.add(message.value) + if (accumulator.size == maxSize) { + emit(accumulator.drain()) + } + } + + Message.TimeIsUp -> { + if (accumulator.size >= minSize) { + emit(accumulator.drain()) + } + } + + Message.Stop -> { + timer.cancelAndJoin() + inbox.close() + } + } + } + + if (accumulator.size >= max(1, minSize)) emit(accumulator) + } + } + + private fun CoroutineScope.startTimer() = launch { + while (true) { + delay(durationMs) + inbox.send(Message.TimeIsUp) + } + } +} + +private fun MutableList.drain() = toList().also { this.clear() } \ No newline at end of file diff --git a/kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt new file mode 100644 index 0000000000..f24c76ecd5 --- /dev/null +++ b/kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt @@ -0,0 +1,231 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow + +import kotlinx.coroutines.* +import kotlin.test.* +import kotlin.time.* + +@ExperimentalTime +class ChunkedTest : TestBase() { + + private val testFlow = flow { + delay(10) + for (i in 1..10_000_000) { + emit(i) +// delay(500) +// for (j in 1..100_000) { +// emit(j) +// } + } + } + + private fun Flow.channelTransform() = channelFlow { + val inbox = produceIn(this) + repeat(4) { + launch(Dispatchers.Default) { + for (value in inbox) send(value.toString().toInt() * 2) + } + } + } + + @Test + fun testLazy() = runTest { + launch(Dispatchers.Default) { + var emissionsCount = 0 + testFlow.chunkedNonResetableTimer(100, false) + .onEach { emissionsCount += it.size } + .count() + .let { println("chunks: $it, total emissions: $emissionsCount") } + } + } + + @Test + fun testEager() = runTest { + launch(Dispatchers.Default) { + var emissionsCount = 0 + testFlow.chunkedNonResetableTimer(100, true) + .onEach { emissionsCount += it.size } + .count() + .let { println("chunks: $it, total emissions: $emissionsCount") } + } + } + + @Test + fun testSelectFixedInterval() = runTest { + launch(Dispatchers.Default) { + var emissionsCount = 0 + testFlow.chunked(100.toDuration(DurationUnit.MILLISECONDS), minSize = 0) + .onEach { emissionsCount += it.size } + .count() + .let { println("chunks: $it, total emissions: $emissionsCount") } + } + } + + @Test + fun testSelectFloatingInterval() = runTest { + launch(Dispatchers.Default) { + var emissionsCount = 0 + testFlow.chunked(100.toDuration(DurationUnit.MILLISECONDS)) + .onEach { emissionsCount += it.size } + .count() + .let { println("chunks: $it, total emissions: $emissionsCount") } + } + } + + @Test + fun testChunkNoMaxTime() = runTest { + launch(Dispatchers.Default) { + var emissionsCount = 0 + testFlow.chunkFixedTimeWindows(100) + .onEach { emissionsCount += it.size } + .count() + .let { println("chunks: $it, total emissions: $emissionsCount") } + } + } + + @Test + fun testEmptyFlow() = runTest { + val emptyFlow = emptyFlow() + val result = emptyFlow.chunked(1000).toList() + + assertTrue { result.isEmpty() } + } + + @ExperimentalTime + @Test + fun testSingleFastElement() = runTest { + val fastFlow = flow { emit(1) } + val result = measureTimedValue { + fastFlow.chunked(10_000).toList() + } + + assertTrue { result.value.size == 1 && result.value.first().contains(1) } + assertTrue { result.duration.inSeconds < 1 } + } + + @Test + fun testWindowsChunkingWithNoMinimumSize() = withVirtualTime { + val intervalFlow = flow { + delay(1500) + emit(1) + delay(1500) + emit(2) + delay(1500) + emit(3) + } + val chunks = intervalFlow.chunked(2.toDuration(DurationUnit.SECONDS), minSize = 0).toList() + + assertEquals (3, chunks.size) + assertTrue { chunks.all { it.size == 1 } } + + finish(1) + } + + @Test + fun testNonContinousWindowsChunking() = withVirtualTime { + val intervalFlow = flow { + delay(1500) + emit(1) + delay(1500) + emit(2) + delay(1500) + emit(3) + } + val chunks = intervalFlow.chunked(2.toDuration(DurationUnit.SECONDS)).toList() + + assertEquals (2, chunks.size) + assertTrue { chunks.first().size == 2 && chunks[1].size == 1 } + + finish(1) + } + + @Test + fun testRespectingMinValue() = withVirtualTime { + val intervalFlow = flow { + delay(1500) + emit(1) + delay(1500) + emit(2) + delay(1500) + emit(3) + } + val chunks = intervalFlow.chunked(2000, minSize = 3).toList() + + assertTrue { chunks.size == 1 } + assertTrue { chunks.first().size == 3 } + + finish(1) + } + + @Test + fun testRespectingMaxValueWithContinousWindows() = withVirtualTime { + val intervalFlow = flow { + delay(1500) + emit(1) + emit(2) + emit(3) + emit(4) + delay(1500) + emit(5) + delay(1500) + emit(6) + } + val chunks = intervalFlow.chunked(2000, minSize = 0, maxSize = 3).toList() + + assertEquals(3, chunks.size) + assertEquals(3, chunks.first().size) + assertEquals(2, chunks[1].size) + assertTrue { chunks[1].containsAll(listOf(4, 5)) } + + finish(1) + } + + @Test + fun testRespectingMaxValueAndResetingTickerWithNonContinousWindows() = withVirtualTime { + val intervalFlow = flow { + delay(1500) + emit(1) + emit(2) + emit(3) + delay(1500) + emit(4) + emit(5) + delay(1500) + emit(6) + } + val chunks = intervalFlow.chunked(2000, maxSize = 3).toList() + + assertEquals(2, chunks.size) + assertEquals(3, chunks.first().size) + assertEquals(3, chunks[1].size) + assertTrue { chunks[1].containsAll(listOf(4, 5, 6)) } + + finish(1) + } + + @Test + fun testSizeBasedChunking() = runTest { + val flow = flow { + for (i in 1..10) emit(i) + } + + val chunks = flow.chunked(maxSize = 3).toList() + + assertEquals(4, chunks.size) + } + + @Test + fun testSizeBasedChunkingWithMinSize() = runTest { + val flow = flow { + for (i in 1..10) emit(i) + } + + val chunks = flow.chunked(maxSize = 3, minSize = 2).toList() + + assertEquals(3, chunks.size) + } + +} \ No newline at end of file From e04a106dce1355aef538d8536d39c1aa3dce05b8 Mon Sep 17 00:00:00 2001 From: Lukasz Wojtach Date: Tue, 10 Nov 2020 13:22:31 +0100 Subject: [PATCH 07/18] Remove unused operators --- .../common/src/flow/operators/Chunked.kt | 132 ------------------ 1 file changed, 132 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Chunked.kt b/kotlinx-coroutines-core/common/src/flow/operators/Chunked.kt index 5bc80ea9aa..92519602b9 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Chunked.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Chunked.kt @@ -173,141 +173,9 @@ private class Ticker( } } -public fun Flow.chunkedNonResetableTimer( - durationMs: Long, - fixedIntervals: Boolean = false, - minSize: Int = 0, - maxSize: Int = NO_MAXIMUM, -): Flow> { - require(minSize >= 0) - require(maxSize == NO_MAXIMUM || maxSize >= minSize) - return scopedFlow { downstream -> - val chunker: Chunker = - if (fixedIntervals) EagerChunker(durationMs, minSize, maxSize) - else LazyChunker(durationMs, minSize, maxSize) - - launch { - collect { value -> chunker.send(value) } - chunker.close() - } - downstream.emitAll(chunker.chunks) - } -} - private sealed class Signal { class NewElement(val value: T) : Signal() object TimeIsUp : Signal() } -private sealed class Message { - class NewElement(val value: T) : Message() - object TimeIsUp : Message() - object Stop : Message() -} - -private abstract class Chunker { - protected val inbox = Channel>(Channel.CHANNEL_DEFAULT_CAPACITY) - - abstract val chunks: Flow> - - suspend fun send(value: T) { - inbox.send(Message.NewElement(value)) - } - - suspend fun close() { - inbox.send(Message.Stop) - } -} - - -private class LazyChunker( - private val durationMs: Long, - private val minSize: Int, - private val maxSize: Int -) : Chunker() { - - override val chunks = flow { - coroutineScope { - val accumulator = ArrayList() - var timer: Job? = null - - for (message in inbox) { - when (message) { - is Message.NewElement -> { - if (accumulator.isEmpty()) timer = startTimer() - accumulator.add(message.value) - if (accumulator.size == maxSize) { - timer?.cancelAndJoin() - emit(accumulator.drain()) - } - } - - Message.TimeIsUp -> { - if (accumulator.size >= minSize) { - emit(accumulator.drain()) - } - } - - Message.Stop -> { - timer?.cancelAndJoin() - inbox.close() - } - } - } - - if (accumulator.size >= max(1, minSize)) emit(accumulator) - } - } - - private fun CoroutineScope.startTimer() = launch { - delay(durationMs) - inbox.send(Message.TimeIsUp) - } -} - -private class EagerChunker( - private val durationMs: Long, - private val minSize: Int, - private val maxSize: Int -) : Chunker() { - - override val chunks: Flow> = flow { - coroutineScope { - val accumulator = ArrayList() - val timer = startTimer() - - for (message in inbox) { - when (message) { - is Message.NewElement -> { - accumulator.add(message.value) - if (accumulator.size == maxSize) { - emit(accumulator.drain()) - } - } - - Message.TimeIsUp -> { - if (accumulator.size >= minSize) { - emit(accumulator.drain()) - } - } - - Message.Stop -> { - timer.cancelAndJoin() - inbox.close() - } - } - } - - if (accumulator.size >= max(1, minSize)) emit(accumulator) - } - } - - private fun CoroutineScope.startTimer() = launch { - while (true) { - delay(durationMs) - inbox.send(Message.TimeIsUp) - } - } -} - private fun MutableList.drain() = toList().also { this.clear() } \ No newline at end of file From 942b1637a0cde6a20a61d6f8ca8de7e5503b6e51 Mon Sep 17 00:00:00 2001 From: Lukasz Wojtach Date: Tue, 10 Nov 2020 13:37:05 +0100 Subject: [PATCH 08/18] Add visibility modifiers and clarify tests --- .../common/src/flow/operators/Chunked.kt | 6 +- .../common/test/flow/operators/ChunkedTest.kt | 110 +++++------------- 2 files changed, 30 insertions(+), 86 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Chunked.kt b/kotlinx-coroutines-core/common/src/flow/operators/Chunked.kt index 92519602b9..aad650a91e 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Chunked.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Chunked.kt @@ -50,7 +50,7 @@ public fun Flow.chunked( else chunkFloatingWindows(chunkDurationMs, minSize, maxSize) } -public fun Flow.chunkFixedTimeWindows(durationMs: Long): Flow> = scopedFlow { downstream -> +private fun Flow.chunkFixedTimeWindows(durationMs: Long): Flow> = scopedFlow { downstream -> val upstream = produce(capacity = Channel.CHANNEL_DEFAULT_CAPACITY) { val ticker = Ticker(durationMs, this).apply { send(Ticker.Message.Start) } launch { @@ -70,7 +70,7 @@ public fun Flow.chunkFixedTimeWindows(durationMs: Long): Flow> = if (accumulator.isNotEmpty()) downstream.emit(accumulator) } -public fun Flow.chunkContinousWindows(durationMs: Long, maxSize: Int): Flow> = +private fun Flow.chunkContinousWindows(durationMs: Long, maxSize: Int): Flow> = scopedFlow { downstream -> val inbox: ReceiveChannel = this@chunkContinousWindows.produceIn(this) val ticker = Ticker(durationMs, this).apply { send(Ticker.Message.Start) } @@ -99,7 +99,7 @@ public fun Flow.chunkContinousWindows(durationMs: Long, maxSize: Int): Fl if (accumulator.isNotEmpty()) downstream.emit(accumulator) } -public fun Flow.chunkFloatingWindows( +private fun Flow.chunkFloatingWindows( durationMs: Long, minSize: Int, maxSize: Int, diff --git a/kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt index f24c76ecd5..df7dd66f0f 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt @@ -11,103 +11,47 @@ import kotlin.time.* @ExperimentalTime class ChunkedTest : TestBase() { - private val testFlow = flow { - delay(10) - for (i in 1..10_000_000) { - emit(i) -// delay(500) -// for (j in 1..100_000) { -// emit(j) -// } - } - } - - private fun Flow.channelTransform() = channelFlow { - val inbox = produceIn(this) - repeat(4) { - launch(Dispatchers.Default) { - for (value in inbox) send(value.toString().toInt() * 2) - } - } - } - - @Test - fun testLazy() = runTest { - launch(Dispatchers.Default) { - var emissionsCount = 0 - testFlow.chunkedNonResetableTimer(100, false) - .onEach { emissionsCount += it.size } - .count() - .let { println("chunks: $it, total emissions: $emissionsCount") } - } - } - @Test - fun testEager() = runTest { - launch(Dispatchers.Default) { - var emissionsCount = 0 - testFlow.chunkedNonResetableTimer(100, true) - .onEach { emissionsCount += it.size } - .count() - .let { println("chunks: $it, total emissions: $emissionsCount") } + fun testEmptyFlowChunking() = runTest { + val emptyFlow = emptyFlow() + val result = measureTimedValue { + emptyFlow.chunked(10.seconds).toList() } - } - @Test - fun testSelectFixedInterval() = runTest { - launch(Dispatchers.Default) { - var emissionsCount = 0 - testFlow.chunked(100.toDuration(DurationUnit.MILLISECONDS), minSize = 0) - .onEach { emissionsCount += it.size } - .count() - .let { println("chunks: $it, total emissions: $emissionsCount") } - } + assertTrue { result.value.isEmpty() } + assertTrue { result.duration.inSeconds < 1 } } + @ExperimentalTime @Test - fun testSelectFloatingInterval() = runTest { - launch(Dispatchers.Default) { - var emissionsCount = 0 - testFlow.chunked(100.toDuration(DurationUnit.MILLISECONDS)) - .onEach { emissionsCount += it.size } - .count() - .let { println("chunks: $it, total emissions: $emissionsCount") } - } - } + fun testSingleFastElementChunking() = runTest { + val fastFlow = flow { emit(1) } - @Test - fun testChunkNoMaxTime() = runTest { - launch(Dispatchers.Default) { - var emissionsCount = 0 - testFlow.chunkFixedTimeWindows(100) - .onEach { emissionsCount += it.size } - .count() - .let { println("chunks: $it, total emissions: $emissionsCount") } + val result = measureTimedValue { + fastFlow.chunked(10.seconds).toList() } - } - - @Test - fun testEmptyFlow() = runTest { - val emptyFlow = emptyFlow() - val result = emptyFlow.chunked(1000).toList() - assertTrue { result.isEmpty() } + assertTrue { result.value.size == 1 && result.value.first().contains(1) } + assertTrue { result.duration.inSeconds < 1 } } @ExperimentalTime @Test - fun testSingleFastElement() = runTest { - val fastFlow = flow { emit(1) } + fun testMultipleFastElementsChunking() = runTest { + val fastFlow = flow { + for(i in 1..1000) emit(1) + } + val result = measureTimedValue { - fastFlow.chunked(10_000).toList() + fastFlow.chunked(10.seconds).toList() } - assertTrue { result.value.size == 1 && result.value.first().contains(1) } + assertTrue { result.value.size == 1 && result.value.first().size == 1000 } assertTrue { result.duration.inSeconds < 1 } } @Test - fun testWindowsChunkingWithNoMinimumSize() = withVirtualTime { + fun testFixedTimeWindowChunkingWithZeroMinimumSize() = withVirtualTime { val intervalFlow = flow { delay(1500) emit(1) @@ -116,7 +60,7 @@ class ChunkedTest : TestBase() { delay(1500) emit(3) } - val chunks = intervalFlow.chunked(2.toDuration(DurationUnit.SECONDS), minSize = 0).toList() + val chunks = intervalFlow.chunked(2.seconds, minSize = 0).toList() assertEquals (3, chunks.size) assertTrue { chunks.all { it.size == 1 } } @@ -125,7 +69,7 @@ class ChunkedTest : TestBase() { } @Test - fun testNonContinousWindowsChunking() = withVirtualTime { + fun testDefaultChunkingWithFloatingWindows() = withVirtualTime { val intervalFlow = flow { delay(1500) emit(1) @@ -134,7 +78,7 @@ class ChunkedTest : TestBase() { delay(1500) emit(3) } - val chunks = intervalFlow.chunked(2.toDuration(DurationUnit.SECONDS)).toList() + val chunks = intervalFlow.chunked(2.seconds).toList() assertEquals (2, chunks.size) assertTrue { chunks.first().size == 2 && chunks[1].size == 1 } @@ -152,7 +96,7 @@ class ChunkedTest : TestBase() { delay(1500) emit(3) } - val chunks = intervalFlow.chunked(2000, minSize = 3).toList() + val chunks = intervalFlow.chunked(2.seconds, minSize = 3).toList() assertTrue { chunks.size == 1 } assertTrue { chunks.first().size == 3 } @@ -173,7 +117,7 @@ class ChunkedTest : TestBase() { delay(1500) emit(6) } - val chunks = intervalFlow.chunked(2000, minSize = 0, maxSize = 3).toList() + val chunks = intervalFlow.chunked(2.seconds, minSize = 0, maxSize = 3).toList() assertEquals(3, chunks.size) assertEquals(3, chunks.first().size) @@ -196,7 +140,7 @@ class ChunkedTest : TestBase() { delay(1500) emit(6) } - val chunks = intervalFlow.chunked(2000, maxSize = 3).toList() + val chunks = intervalFlow.chunked(2.seconds, maxSize = 3).toList() assertEquals(2, chunks.size) assertEquals(3, chunks.first().size) From a12429eec3df962ccca352fe90082747277c7804 Mon Sep 17 00:00:00 2001 From: Lukasz Wojtach Date: Wed, 23 Dec 2020 15:09:23 +0100 Subject: [PATCH 09/18] Chunk with interval and size only --- .../common/src/flow/operators/Chunked.kt | 191 +++++------------- 1 file changed, 52 insertions(+), 139 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Chunked.kt b/kotlinx-coroutines-core/common/src/flow/operators/Chunked.kt index aad650a91e..482a29c7b4 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Chunked.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Chunked.kt @@ -12,170 +12,83 @@ import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.internal.* import kotlinx.coroutines.selects.* import kotlin.jvm.* -import kotlin.math.* import kotlin.time.* -private const val NO_MAXIMUM = -1 - -public fun Flow.chunked(maxSize: Int, minSize: Int = 1): Flow> { - require(minSize in 0 until maxSize) - return flow { - val accumulator = mutableListOf() - collect { value -> - accumulator.add(value) - if (accumulator.size == maxSize) emit(accumulator.drain()) - } - if (accumulator.size >= minSize) emit(accumulator) - } +public object ChunkConstraint { + public const val NO_MAXIMUM: Int = Int.MAX_VALUE + public const val NO_INTERVAL: Long = Long.MAX_VALUE } @ExperimentalTime public fun Flow.chunked( - chunkDuration: Duration, - minSize: Int = 1, - maxSize: Int = NO_MAXIMUM -): Flow> = chunked(chunkDuration.toDelayMillis(), minSize, maxSize) + interval: Duration, + size: Int +): Flow> = chunked(interval.toLongMilliseconds(), size) public fun Flow.chunked( - chunkDurationMs: Long, - minSize: Int = 1, - maxSize: Int = NO_MAXIMUM + intervalMs: Long, + size: Int ): Flow> { - require(chunkDurationMs > 0) - require(minSize >= 0) - require(maxSize == NO_MAXIMUM || maxSize >= minSize) - - return if (minSize == 0 && maxSize == NO_MAXIMUM) chunkFixedTimeWindows(chunkDurationMs) - else if (minSize == 0) chunkContinousWindows(chunkDurationMs, maxSize) - else chunkFloatingWindows(chunkDurationMs, minSize, maxSize) -} - -private fun Flow.chunkFixedTimeWindows(durationMs: Long): Flow> = scopedFlow { downstream -> - val upstream = produce(capacity = Channel.CHANNEL_DEFAULT_CAPACITY) { - val ticker = Ticker(durationMs, this).apply { send(Ticker.Message.Start) } - launch { - for (tick in ticker) send(Signal.TimeIsUp) - } - collect { value -> send(Signal.NewElement(value)) } - ticker.close() - } - val accumulator = mutableListOf() + require(intervalMs >= 0) + require(size > 0) - for (signal in upstream) { - when (signal) { - is Signal.NewElement -> accumulator.add(signal.value) - is Signal.TimeIsUp -> downstream.emit(accumulator.drain()) - } - } - if (accumulator.isNotEmpty()) downstream.emit(accumulator) + return if(intervalMs != ChunkConstraint.NO_INTERVAL) chunkedTimeBased(intervalMs, size) + else chunkedSizeBased(size) } -private fun Flow.chunkContinousWindows(durationMs: Long, maxSize: Int): Flow> = - scopedFlow { downstream -> - val inbox: ReceiveChannel = this@chunkContinousWindows.produceIn(this) - val ticker = Ticker(durationMs, this).apply { send(Ticker.Message.Start) } - val accumulator = mutableListOf() +private fun Flow.chunkedTimeBased(intervalMs: Long, size: Int): Flow> = scopedFlow { downstream -> + val buffer = Channel(size) + val emitSemaphore = Channel() + val collectSemaphore = Channel() - whileSelect { - inbox.onReceiveOrClosed.invoke { valueOrClosed -> - val isOpen = !valueOrClosed.isClosed - if (isOpen) { - accumulator.add(valueOrClosed.value) - if(accumulator.size == maxSize){ - ticker.send(Ticker.Message.Reset) - downstream.emit(accumulator.drain()) - ticker.send(Ticker.Message.Start) - } - } - isOpen - } - ticker.onReceive.invoke { - downstream.emit(accumulator.drain()) - true + launch { + collect { value -> + val hasCapacity = buffer.offer(value) + if (!hasCapacity) { + emitSemaphore.send(Unit) + collectSemaphore.receive() + buffer.send(value) } } - - ticker.close() - if (accumulator.isNotEmpty()) downstream.emit(accumulator) + emitSemaphore.close() + buffer.close() } -private fun Flow.chunkFloatingWindows( - durationMs: Long, - minSize: Int, - maxSize: Int, -): Flow> { - - return scopedFlow { downstream -> - val upstream: ReceiveChannel = this@chunkFloatingWindows.produceIn(this) - val ticker = Ticker(durationMs, this) - val accumulator = mutableListOf() - - whileSelect { - upstream.onReceiveOrClosed.invoke { valueOrClosed -> - val isOpen = valueOrClosed.isClosed.not() - if (isOpen) { - if (accumulator.isEmpty()) ticker.send(Ticker.Message.Start) - accumulator.add(valueOrClosed.value) - if (accumulator.size == maxSize) { - ticker.send(Ticker.Message.Reset) - downstream.emit(accumulator.drain()) - } - } - isOpen - } - ticker.onReceive.invoke { - if (accumulator.size >= minSize) downstream.emit(accumulator.drain()) - true - } + whileSelect { + emitSemaphore.onReceiveOrClosed { valueOrClosed -> + buffer.drain().takeIf { it.isNotEmpty() }?.let { downstream.emit(it) } + val shouldCollectNextChunk = valueOrClosed.isClosed.not() + if (shouldCollectNextChunk) collectSemaphore.send(Unit) else collectSemaphore.close() + shouldCollectNextChunk } - - ticker.close() - if (accumulator.size >= minSize) downstream.emit(accumulator) - } -} - -private class Ticker( - private val intervalMs: Long, - scope: CoroutineScope, - private val inbox: Channel = Channel(), - private val ticks: Channel = Channel() -) : SendChannel by inbox, ReceiveChannel by ticks { - - init { - scope.processMessages() - } - - private fun CoroutineScope.processMessages() = launch { - var ticker = setupTicks() - for (message in inbox) { - when (message) { - Message.Start -> ticker.start() - Message.Reset -> { - ticker.cancel() - ticker = setupTicks() - } - } + onTimeout(intervalMs) { + downstream.emit(buffer.awaitFirstAndDrain()) + true } - ticker.cancel() - ticks.cancel() } +} - private fun CoroutineScope.setupTicks() = launch(start = CoroutineStart.LAZY) { - while (true) { - delay(intervalMs) - ticks.send(Unit) - } - } +private suspend fun ReceiveChannel.awaitFirstAndDrain(): List { + val first = receiveOrClosed().takeIf { it.isClosed.not() }?.value ?: return emptyList() + return drain(mutableListOf(first)) +} - sealed class Message { - object Start : Message() - object Reset : Message() +private tailrec fun ReceiveChannel.drain(acc: MutableList = mutableListOf()): List { + val item = poll() + return if (item == null) acc + else { + acc.add(item) + drain(acc) } } -private sealed class Signal { - class NewElement(val value: T) : Signal() - object TimeIsUp : Signal() +private fun Flow.chunkedSizeBased(size: Int): Flow> = flow { + val buffer = mutableListOf() + collect { value -> + buffer.add(value) + if(buffer.size == size) emit(buffer.drain()) + } + if(buffer.isNotEmpty()) emit(buffer) } private fun MutableList.drain() = toList().also { this.clear() } \ No newline at end of file From da1a57c77f96a1d5405981e401233138fbedaddc Mon Sep 17 00:00:00 2001 From: Lukasz Wojtach Date: Fri, 8 Jan 2021 11:33:16 +0100 Subject: [PATCH 10/18] Chunk with interval and size only - part 2 --- .../common/src/flow/operators/Chunked.kt | 36 +++--- .../common/test/flow/operators/ChunkedTest.kt | 109 +----------------- 2 files changed, 21 insertions(+), 124 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Chunked.kt b/kotlinx-coroutines-core/common/src/flow/operators/Chunked.kt index 482a29c7b4..f94dedfb63 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Chunked.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Chunked.kt @@ -17,6 +17,7 @@ import kotlin.time.* public object ChunkConstraint { public const val NO_MAXIMUM: Int = Int.MAX_VALUE public const val NO_INTERVAL: Long = Long.MAX_VALUE + public const val NATURAL_BATCHING: Long = 0 } @ExperimentalTime @@ -32,53 +33,48 @@ public fun Flow.chunked( require(intervalMs >= 0) require(size > 0) - return if(intervalMs != ChunkConstraint.NO_INTERVAL) chunkedTimeBased(intervalMs, size) - else chunkedSizeBased(size) + return chunkedTimeBased(intervalMs, size) } private fun Flow.chunkedTimeBased(intervalMs: Long, size: Int): Flow> = scopedFlow { downstream -> val buffer = Channel(size) - val emitSemaphore = Channel() - val collectSemaphore = Channel() + val emitNowSemaphore = Channel() launch { collect { value -> val hasCapacity = buffer.offer(value) if (!hasCapacity) { - emitSemaphore.send(Unit) - collectSemaphore.receive() + emitNowSemaphore.send(Unit) buffer.send(value) } } - emitSemaphore.close() + emitNowSemaphore.close() buffer.close() } whileSelect { - emitSemaphore.onReceiveOrClosed { valueOrClosed -> - buffer.drain().takeIf { it.isNotEmpty() }?.let { downstream.emit(it) } - val shouldCollectNextChunk = valueOrClosed.isClosed.not() - if (shouldCollectNextChunk) collectSemaphore.send(Unit) else collectSemaphore.close() - shouldCollectNextChunk + emitNowSemaphore.onReceiveOrClosed { valueOrClosed -> + buffer.drain(maxElements = size).takeIf { it.isNotEmpty() }?.let { downstream.emit(it) } + valueOrClosed.isClosed.not() } onTimeout(intervalMs) { - downstream.emit(buffer.awaitFirstAndDrain()) + downstream.emit(buffer.awaitFirstAndDrain(maxElements = size)) true } } } -private suspend fun ReceiveChannel.awaitFirstAndDrain(): List { +private suspend fun ReceiveChannel.awaitFirstAndDrain(maxElements: Int): List { val first = receiveOrClosed().takeIf { it.isClosed.not() }?.value ?: return emptyList() - return drain(mutableListOf(first)) + return drain(mutableListOf(first), maxElements) } -private tailrec fun ReceiveChannel.drain(acc: MutableList = mutableListOf()): List { +private tailrec fun ReceiveChannel.drain(acc: MutableList = mutableListOf(), maxElements: Int): List { val item = poll() - return if (item == null) acc + return if (item == null || acc.size == maxElements) acc else { acc.add(item) - drain(acc) + drain(acc, maxElements) } } @@ -86,9 +82,9 @@ private fun Flow.chunkedSizeBased(size: Int): Flow> = flow { val buffer = mutableListOf() collect { value -> buffer.add(value) - if(buffer.size == size) emit(buffer.drain()) + if (buffer.size == size) emit(buffer.drain()) } - if(buffer.isNotEmpty()) emit(buffer) + if (buffer.isNotEmpty()) emit(buffer) } private fun MutableList.drain() = toList().also { this.clear() } \ No newline at end of file diff --git a/kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt index df7dd66f0f..5ffe767268 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt @@ -15,7 +15,7 @@ class ChunkedTest : TestBase() { fun testEmptyFlowChunking() = runTest { val emptyFlow = emptyFlow() val result = measureTimedValue { - emptyFlow.chunked(10.seconds).toList() + emptyFlow.chunked(10.seconds, ChunkConstraint.NO_MAXIMUM).toList() } assertTrue { result.value.isEmpty() } @@ -28,7 +28,7 @@ class ChunkedTest : TestBase() { val fastFlow = flow { emit(1) } val result = measureTimedValue { - fastFlow.chunked(10.seconds).toList() + fastFlow.chunked(10.seconds, ChunkConstraint.NO_MAXIMUM).toList() } assertTrue { result.value.size == 1 && result.value.first().contains(1) } @@ -43,7 +43,7 @@ class ChunkedTest : TestBase() { } val result = measureTimedValue { - fastFlow.chunked(10.seconds).toList() + fastFlow.chunked(10.seconds, ChunkConstraint.NO_MAXIMUM).toList() } assertTrue { result.value.size == 1 && result.value.first().size == 1000 } @@ -51,61 +51,7 @@ class ChunkedTest : TestBase() { } @Test - fun testFixedTimeWindowChunkingWithZeroMinimumSize() = withVirtualTime { - val intervalFlow = flow { - delay(1500) - emit(1) - delay(1500) - emit(2) - delay(1500) - emit(3) - } - val chunks = intervalFlow.chunked(2.seconds, minSize = 0).toList() - - assertEquals (3, chunks.size) - assertTrue { chunks.all { it.size == 1 } } - - finish(1) - } - - @Test - fun testDefaultChunkingWithFloatingWindows() = withVirtualTime { - val intervalFlow = flow { - delay(1500) - emit(1) - delay(1500) - emit(2) - delay(1500) - emit(3) - } - val chunks = intervalFlow.chunked(2.seconds).toList() - - assertEquals (2, chunks.size) - assertTrue { chunks.first().size == 2 && chunks[1].size == 1 } - - finish(1) - } - - @Test - fun testRespectingMinValue() = withVirtualTime { - val intervalFlow = flow { - delay(1500) - emit(1) - delay(1500) - emit(2) - delay(1500) - emit(3) - } - val chunks = intervalFlow.chunked(2.seconds, minSize = 3).toList() - - assertTrue { chunks.size == 1 } - assertTrue { chunks.first().size == 3 } - - finish(1) - } - - @Test - fun testRespectingMaxValueWithContinousWindows() = withVirtualTime { + fun testRespectingSizeAndTimeLimit() = withVirtualTime { val intervalFlow = flow { delay(1500) emit(1) @@ -117,7 +63,7 @@ class ChunkedTest : TestBase() { delay(1500) emit(6) } - val chunks = intervalFlow.chunked(2.seconds, minSize = 0, maxSize = 3).toList() + val chunks = intervalFlow.chunked(2.seconds, size = 3).toList() assertEquals(3, chunks.size) assertEquals(3, chunks.first().size) @@ -127,49 +73,4 @@ class ChunkedTest : TestBase() { finish(1) } - @Test - fun testRespectingMaxValueAndResetingTickerWithNonContinousWindows() = withVirtualTime { - val intervalFlow = flow { - delay(1500) - emit(1) - emit(2) - emit(3) - delay(1500) - emit(4) - emit(5) - delay(1500) - emit(6) - } - val chunks = intervalFlow.chunked(2.seconds, maxSize = 3).toList() - - assertEquals(2, chunks.size) - assertEquals(3, chunks.first().size) - assertEquals(3, chunks[1].size) - assertTrue { chunks[1].containsAll(listOf(4, 5, 6)) } - - finish(1) - } - - @Test - fun testSizeBasedChunking() = runTest { - val flow = flow { - for (i in 1..10) emit(i) - } - - val chunks = flow.chunked(maxSize = 3).toList() - - assertEquals(4, chunks.size) - } - - @Test - fun testSizeBasedChunkingWithMinSize() = runTest { - val flow = flow { - for (i in 1..10) emit(i) - } - - val chunks = flow.chunked(maxSize = 3, minSize = 2).toList() - - assertEquals(3, chunks.size) - } - } \ No newline at end of file From 632d540282a874d1f2b4d12902f3cd75d600e1c7 Mon Sep 17 00:00:00 2001 From: Lukasz Wojtach Date: Mon, 29 Mar 2021 22:49:03 +0200 Subject: [PATCH 11/18] Prepare Chunking Methods --- .../common/src/flow/operators/Chunked.kt | 90 ---------- .../src/flow/operators/ChunkingMethod.kt | 166 ++++++++++++++++++ 2 files changed, 166 insertions(+), 90 deletions(-) delete mode 100644 kotlinx-coroutines-core/common/src/flow/operators/Chunked.kt create mode 100644 kotlinx-coroutines-core/common/src/flow/operators/ChunkingMethod.kt diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Chunked.kt b/kotlinx-coroutines-core/common/src/flow/operators/Chunked.kt deleted file mode 100644 index f94dedfb63..0000000000 --- a/kotlinx-coroutines-core/common/src/flow/operators/Chunked.kt +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -@file:JvmMultifileClass -@file:JvmName("FlowKt") - -package kotlinx.coroutines.flow - -import kotlinx.coroutines.* -import kotlinx.coroutines.channels.* -import kotlinx.coroutines.flow.internal.* -import kotlinx.coroutines.selects.* -import kotlin.jvm.* -import kotlin.time.* - -public object ChunkConstraint { - public const val NO_MAXIMUM: Int = Int.MAX_VALUE - public const val NO_INTERVAL: Long = Long.MAX_VALUE - public const val NATURAL_BATCHING: Long = 0 -} - -@ExperimentalTime -public fun Flow.chunked( - interval: Duration, - size: Int -): Flow> = chunked(interval.toLongMilliseconds(), size) - -public fun Flow.chunked( - intervalMs: Long, - size: Int -): Flow> { - require(intervalMs >= 0) - require(size > 0) - - return chunkedTimeBased(intervalMs, size) -} - -private fun Flow.chunkedTimeBased(intervalMs: Long, size: Int): Flow> = scopedFlow { downstream -> - val buffer = Channel(size) - val emitNowSemaphore = Channel() - - launch { - collect { value -> - val hasCapacity = buffer.offer(value) - if (!hasCapacity) { - emitNowSemaphore.send(Unit) - buffer.send(value) - } - } - emitNowSemaphore.close() - buffer.close() - } - - whileSelect { - emitNowSemaphore.onReceiveOrClosed { valueOrClosed -> - buffer.drain(maxElements = size).takeIf { it.isNotEmpty() }?.let { downstream.emit(it) } - valueOrClosed.isClosed.not() - } - onTimeout(intervalMs) { - downstream.emit(buffer.awaitFirstAndDrain(maxElements = size)) - true - } - } -} - -private suspend fun ReceiveChannel.awaitFirstAndDrain(maxElements: Int): List { - val first = receiveOrClosed().takeIf { it.isClosed.not() }?.value ?: return emptyList() - return drain(mutableListOf(first), maxElements) -} - -private tailrec fun ReceiveChannel.drain(acc: MutableList = mutableListOf(), maxElements: Int): List { - val item = poll() - return if (item == null || acc.size == maxElements) acc - else { - acc.add(item) - drain(acc, maxElements) - } -} - -private fun Flow.chunkedSizeBased(size: Int): Flow> = flow { - val buffer = mutableListOf() - collect { value -> - buffer.add(value) - if (buffer.size == size) emit(buffer.drain()) - } - if (buffer.isNotEmpty()) emit(buffer) -} - -private fun MutableList.drain() = toList().also { this.clear() } \ No newline at end of file diff --git a/kotlinx-coroutines-core/common/src/flow/operators/ChunkingMethod.kt b/kotlinx-coroutines-core/common/src/flow/operators/ChunkingMethod.kt new file mode 100644 index 0000000000..6fb9ffa812 --- /dev/null +++ b/kotlinx-coroutines-core/common/src/flow/operators/ChunkingMethod.kt @@ -0,0 +1,166 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +@file:JvmMultifileClass +@file:JvmName("FlowKt") + +package kotlinx.coroutines.flow + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import kotlinx.coroutines.flow.internal.* +import kotlinx.coroutines.selects.* +import kotlin.jvm.* +import kotlin.time.* + +public fun Flow.chunked(method: ChunkingMethod): Flow> = with(method) { chunk() } + +public interface ChunkingMethod { + public fun Flow.chunk(): Flow> + + public companion object { + public fun Natural(maxSize: Int = Int.MAX_VALUE): ChunkingMethod = TimeBased(0, maxSize) + + public fun ByTime(intervalMs: Long, maxSize: Int = Int.MAX_VALUE): ChunkingMethod = + TimeBased(intervalMs, maxSize) + + public fun BySize(size: Int): ChunkingMethod = SizeBased(size) + + public fun ByTimeOrSize(intervalMs: Long, maxSize: Int): ChunkingMethod = TimeOrSizeBased(intervalMs, maxSize) + } +} + +private class TimeBased(private val intervalMs: Long, private val maxSize: Int) : ChunkingMethod { + + override fun Flow.chunk(): Flow> = scopedFlow { downstream -> + val upstream = buffer(maxSize).produceIn(this) + + while (!upstream.isClosedForReceive) { + delay(intervalMs) + val chunk = upstream.awaitFirstAndDrain(maxSize) + if (chunk.isNotEmpty()) downstream.emit(chunk) + } + } +} + +private class SizeBased(private val size: Int) : ChunkingMethod { + + override fun Flow.chunk(): Flow> = flow> { + val accumulator = ArrayList(size) + collect { element -> + accumulator.add(element) + if (accumulator.size == size) emit(accumulator.drain()) + } + if (accumulator.isNotEmpty()) emit(accumulator) + } +} + +private class TimeOrSizeBased(private val intervalMs: Long, private val maxSize: Int) : ChunkingMethod { + + override fun Flow.chunk(): Flow> = scopedFlow { downstream -> + val emitNowAndMaybeContinue = Channel() + val elements = produce(capacity = maxSize) { + collect { element -> + val hasCapacity = channel.offer(element) + if (!hasCapacity) { + emitNowAndMaybeContinue.send(true) + channel.send(element) + } + } + emitNowAndMaybeContinue.send(false) + } + + whileSelect { + emitNowAndMaybeContinue.onReceive { shouldContinue -> + val chunk = elements.drain(maxElements = maxSize) + if (chunk.isNotEmpty()) downstream.emit(chunk) + shouldContinue + } + + onTimeout(intervalMs) { + val chunk = elements.awaitFirstAndDrain(maxElements = maxSize) + if (chunk.isNotEmpty()) downstream.emit(chunk) + true + } + } + } + +} + +public object ChunkConstraint { + public const val NO_MAXIMUM: Int = Int.MAX_VALUE + public const val NO_INTERVAL: Long = Long.MAX_VALUE + public const val NATURAL_BATCHING: Long = 0 +} + +@ExperimentalTime +public fun Flow.chunkedOld( + interval: Duration, + size: Int +): Flow> = chunkedOld(interval.toLongMilliseconds(), size) + +public fun Flow.chunkedOld( + intervalMs: Long, + size: Int +): Flow> { + require(intervalMs >= 0) + require(size > 0) + + return chunkedTimeBased(intervalMs, size) +} + +private fun Flow.chunkedTimeBased(intervalMs: Long, size: Int): Flow> = scopedFlow { downstream -> + val buffer = Channel(size) + val emitNowSemaphore = Channel() + + launch { + collect { value -> + val hasCapacity = buffer.offer(value) + if (!hasCapacity) { + emitNowSemaphore.send(Unit) + buffer.send(value) + } + } + emitNowSemaphore.close() + buffer.close() + } + + whileSelect { + emitNowSemaphore.onReceiveOrClosed { valueOrClosed -> + buffer.drain(maxElements = size).takeIf { it.isNotEmpty() }?.let { downstream.emit(it) } + valueOrClosed.isClosed.not() + } + onTimeout(intervalMs) { + downstream.emit(buffer.awaitFirstAndDrain(maxElements = size)) + true + } + } +} + +private suspend fun ReceiveChannel.awaitFirstAndDrain(maxElements: Int): List { + val first = receiveOrClosed().takeIf { it.isClosed.not() }?.value ?: return emptyList() + return drain(mutableListOf(first), maxElements) +} + +private tailrec fun ReceiveChannel.drain(acc: MutableList = mutableListOf(), maxElements: Int): List = + if (acc.size == maxElements) acc + else { + val item = poll() + if (item == null) acc + else { + acc.add(item) + drain(acc, maxElements) + } + } + +private fun Flow.chunkedSizeBased(size: Int): Flow> = flow { + val buffer = mutableListOf() + collect { value -> + buffer.add(value) + if (buffer.size == size) emit(buffer.drain()) + } + if (buffer.isNotEmpty()) emit(buffer) +} + +private fun MutableList.drain() = toList().also { this.clear() } \ No newline at end of file From c3244ff1168c672036e0d488a53a527f896070d0 Mon Sep 17 00:00:00 2001 From: Lukasz Wojtach Date: Wed, 31 Mar 2021 15:22:05 +0200 Subject: [PATCH 12/18] Add a bunch of tests --- kotlinx-coroutines-core/build.gradle | 10 +- .../operators/{ChunkingMethod.kt => Chunk.kt} | 30 ++- .../common/test/flow/operators/ChunkedTest.kt | 191 ++++++++++++++---- 3 files changed, 188 insertions(+), 43 deletions(-) rename kotlinx-coroutines-core/common/src/flow/operators/{ChunkingMethod.kt => Chunk.kt} (85%) diff --git a/kotlinx-coroutines-core/build.gradle b/kotlinx-coroutines-core/build.gradle index c2a57f9d9e..e34d540217 100644 --- a/kotlinx-coroutines-core/build.gradle +++ b/kotlinx-coroutines-core/build.gradle @@ -166,11 +166,11 @@ kotlin.sourceSets { task checkJdk16() { // only fail w/o JDK_16 when actually trying to compile, not during project setup phase doLast { - if (!System.env.JDK_16) { - throw new GradleException("JDK_16 environment variable is not defined. " + - "Can't build against JDK 1.6 runtime and run JDK 1.6 compatibility tests. " + - "Please ensure JDK 1.6 is installed and that JDK_16 points to it.") - } +// if (!System.env.JDK_16) { +// throw new GradleException("JDK_16 environment variable is not defined. " + +// "Can't build against JDK 1.6 runtime and run JDK 1.6 compatibility tests. " + +// "Please ensure JDK 1.6 is installed and that JDK_16 points to it.") +// } } } diff --git a/kotlinx-coroutines-core/common/src/flow/operators/ChunkingMethod.kt b/kotlinx-coroutines-core/common/src/flow/operators/Chunk.kt similarity index 85% rename from kotlinx-coroutines-core/common/src/flow/operators/ChunkingMethod.kt rename to kotlinx-coroutines-core/common/src/flow/operators/Chunk.kt index 6fb9ffa812..3636cc3cd1 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/ChunkingMethod.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Chunk.kt @@ -20,7 +20,7 @@ public interface ChunkingMethod { public fun Flow.chunk(): Flow> public companion object { - public fun Natural(maxSize: Int = Int.MAX_VALUE): ChunkingMethod = TimeBased(0, maxSize) + public fun Natural(maxSize: Int = Int.MAX_VALUE): ChunkingMethod = NaturalChunking(maxSize) public fun ByTime(intervalMs: Long, maxSize: Int = Int.MAX_VALUE): ChunkingMethod = TimeBased(intervalMs, maxSize) @@ -31,19 +31,43 @@ public interface ChunkingMethod { } } -private class TimeBased(private val intervalMs: Long, private val maxSize: Int) : ChunkingMethod { +private class NaturalChunking(private val maxSize: Int) : ChunkingMethod { override fun Flow.chunk(): Flow> = scopedFlow { downstream -> val upstream = buffer(maxSize).produceIn(this) while (!upstream.isClosedForReceive) { - delay(intervalMs) val chunk = upstream.awaitFirstAndDrain(maxSize) if (chunk.isNotEmpty()) downstream.emit(chunk) } } } +private class TimeBased(private val intervalMs: Long, private val maxSize: Int) : ChunkingMethod { + + override fun Flow.chunk(): Flow> = scopedFlow { downstream -> + val upstreamCollection = Job() + val upstream = produce(capacity = maxSize) { + collect { element -> channel.send(element) } + upstreamCollection.complete() + } + + whileSelect { + upstreamCollection.onJoin { + val chunk = upstream.drain(maxElements = maxSize) + if (chunk.isNotEmpty()) downstream.emit(chunk) + false + } + + onTimeout(intervalMs) { + val chunk = upstream.awaitFirstAndDrain(maxElements = maxSize) + if (chunk.isNotEmpty()) downstream.emit(chunk) + true + } + } + } +} + private class SizeBased(private val size: Int) : ChunkingMethod { override fun Flow.chunk(): Flow> = flow> { diff --git a/kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt index 5ffe767268..d14fb2e021 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt @@ -12,65 +12,186 @@ import kotlin.time.* class ChunkedTest : TestBase() { @Test - fun testEmptyFlowChunking() = runTest { + fun testEmptyFlowSizeBasedChunking() = runTest { val emptyFlow = emptyFlow() - val result = measureTimedValue { - emptyFlow.chunked(10.seconds, ChunkConstraint.NO_MAXIMUM).toList() - } - - assertTrue { result.value.isEmpty() } - assertTrue { result.duration.inSeconds < 1 } + val result = emptyFlow.chunked(ChunkingMethod.BySize(5)).toList() + assertTrue(result.isEmpty()) } - @ExperimentalTime @Test - fun testSingleFastElementChunking() = runTest { - val fastFlow = flow { emit(1) } + fun testUndersizedFlowSizeBasedChunking() = runTest { + val undersizeFlow = flow { + for (i in 1..3) emit(i) + } + val result = undersizeFlow.chunked(ChunkingMethod.BySize(5)).toList() + assertEquals(1, result.size) + assertEquals(listOf(1, 2, 3), result.first()) + } - val result = measureTimedValue { - fastFlow.chunked(10.seconds, ChunkConstraint.NO_MAXIMUM).toList() + @Test + fun testOversizedFlowSizeBasedChunking() = runTest { + val oversizedFlow = flow { + for (i in 1..10) emit(i) } + val result = oversizedFlow.chunked(ChunkingMethod.BySize(3)).toList() + assertEquals(4, result.size) + assertEquals(3, result.first().size) + assertEquals(1, result[3].size) - assertTrue { result.value.size == 1 && result.value.first().contains(1) } - assertTrue { result.duration.inSeconds < 1 } } - @ExperimentalTime @Test - fun testMultipleFastElementsChunking() = runTest { - val fastFlow = flow { - for(i in 1..1000) emit(1) - } + fun testEmptyFlowNaturalChunking() = runTest { + val emptyFlow = emptyFlow() + val result = emptyFlow.chunked(ChunkingMethod.Natural()).toList() + assertTrue(result.isEmpty()) + } - val result = measureTimedValue { - fastFlow.chunked(10.seconds, ChunkConstraint.NO_MAXIMUM).toList() + @Test + fun testFastCollectorNaturalChunking() = withVirtualTime { + val slowProducer = flow { + for (i in 1..10) { + delay(5) + emit(i) + } } - assertTrue { result.value.size == 1 && result.value.first().size == 1000 } - assertTrue { result.duration.inSeconds < 1 } + val result = slowProducer.chunked(ChunkingMethod.Natural()).toList() + assertEquals(10, result.size) + result.forEach { assertEquals(1, it.size) } + + finish(1) } @Test - fun testRespectingSizeAndTimeLimit() = withVirtualTime { - val intervalFlow = flow { - delay(1500) + fun testSlowCollectorNaturalChunking() = withVirtualTime { + val producerInterval = 5L + val fastProducer = flow { emit(1) + expect(1) + delay(producerInterval) + emit(2) + expect(3) + delay(producerInterval) + emit(3) + expect(4) + delay(producerInterval) + emit(4) - delay(1500) + expect(6) + delay(producerInterval) + emit(5) - delay(1500) - emit(6) + expect(7) + delay(producerInterval) } - val chunks = intervalFlow.chunked(2.seconds, size = 3).toList() - assertEquals(3, chunks.size) - assertEquals(3, chunks.first().size) - assertEquals(2, chunks[1].size) - assertTrue { chunks[1].containsAll(listOf(4, 5)) } + val result = fastProducer.chunked(ChunkingMethod.Natural()).withIndex().onEach { indexed -> + when (indexed.index) { + 0 -> expect(2) + 1 -> expect(5) + 2 -> finish(8) + } + delay(11) + }.toList() - finish(1) + assertEquals(3, result.size) + assertEquals(1, result.first().value.size) + for (i in 1..2) assertEquals(2, result[i].value.size) + } + + @Test + fun testEmptyFlowWithSlowTimeBasedChunking() = runTest { + val emptyFlow = emptyFlow() + val result = measureTimedValue { emptyFlow.chunked(ChunkingMethod.ByTime(intervalMs = 10 * 1000)).toList() } + assertTrue(result.value.isEmpty()) + assertTrue(result.duration < 1000.milliseconds) + } + + @Test + fun testErrorPropagationInTimeBasedChunking() = runTest { + val exception = IllegalArgumentException() + val failedFlow = flow { + emit(1) + emit(2) + throw exception + } + var catchedException: Throwable? = null + + val result = failedFlow + .chunked(ChunkingMethod.ByTime(10 * 10_000)) + .catch { e -> + catchedException = e + emit(listOf(3)) + } + .toList() + + assertTrue(catchedException is IllegalArgumentException) + assertEquals(3, result.first().single()) } +// @Test +// fun testEmptyFlowChunking() = runTest { +// val emptyFlow = emptyFlow() +// val result = measureTimedValue { +// emptyFlow.chunked(10.seconds, ChunkConstraint.NO_MAXIMUM).toList() +// } +// +// assertTrue { result.value.isEmpty() } +// assertTrue { result.duration.inSeconds < 1 } +// } +// +// @ExperimentalTime +// @Test +// fun testSingleFastElementChunking() = runTest { +// val fastFlow = flow { emit(1) } +// +// val result = measureTimedValue { +// fastFlow.chunked(10.seconds, ChunkConstraint.NO_MAXIMUM).toList() +// } +// +// assertTrue { result.value.size == 1 && result.value.first().contains(1) } +// assertTrue { result.duration.inSeconds < 1 } +// } +// +// @ExperimentalTime +// @Test +// fun testMultipleFastElementsChunking() = runTest { +// val fastFlow = flow { +// for(i in 1..1000) emit(1) +// } +// +// val result = measureTimedValue { +// fastFlow.chunked(10.seconds, ChunkConstraint.NO_MAXIMUM).toList() +// } +// +// assertTrue { result.value.size == 1 && result.value.first().size == 1000 } +// assertTrue { result.duration.inSeconds < 1 } +// } +// +// @Test +// fun testRespectingSizeAndTimeLimit() = withVirtualTime { +// val intervalFlow = flow { +// delay(1500) +// emit(1) +// emit(2) +// emit(3) +// emit(4) +// delay(1500) +// emit(5) +// delay(1500) +// emit(6) +// } +// val chunks = intervalFlow.chunked(2.seconds, size = 3).toList() +// +// assertEquals(3, chunks.size) +// assertEquals(3, chunks.first().size) +// assertEquals(2, chunks[1].size) +// assertTrue { chunks[1].containsAll(listOf(4, 5)) } +// +// finish(1) +// } + } \ No newline at end of file From 5b5c3bdad16ad71f80b68aeb3fe67a06b6f5adf6 Mon Sep 17 00:00:00 2001 From: Lukasz Wojtach Date: Fri, 2 Apr 2021 14:24:56 +0200 Subject: [PATCH 13/18] Test Time based chunking --- .../common/test/flow/operators/ChunkedTest.kt | 80 +++++++++++++++++++ 1 file changed, 80 insertions(+) diff --git a/kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt index d14fb2e021..de9440f108 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt @@ -132,6 +132,86 @@ class ChunkedTest : TestBase() { assertEquals(3, result.first().single()) } + @Test + fun testTimeBasedChunkingOfMultipleElements() = withVirtualTime { + val producer = flow { + for (i in 1..10) { + delay(1000) + emit(i) + } + } + + val result = producer.chunked(ChunkingMethod.ByTime(5500)).toList() + + finish(1) + + assertEquals(2, result.size) + assertEquals(5, result.first().size) + assertEquals(5, result[1].size) + } + + @Test + fun testTimeBasedChunkingWithMaxChunkSizeSuspendingProducer() = runTest { + val producer = flow { + for (i in 1..10) { + emit(i) + } + } + + val result = measureTimedValue { producer.chunked(ChunkingMethod.ByTime(200, maxSize = 5)).toList() } + + finish(1) + + assertEquals(2, result.value.size) + assertEquals(5, result.value.first().size) + assertEquals(5, result.value[1].size) + assertTrue(result.duration >= 200.milliseconds, "expected time at least 400 ms but was: ${result.duration}") + } + + @Test + fun testEmptyFlowTimeOrSizeBasedChunking() = runTest { + val emptyFlow = emptyFlow() + val result = measureTimedValue { + emptyFlow.chunked(ChunkingMethod.ByTimeOrSize(intervalMs = 10 * 1000, maxSize = 5)).toList() + } + assertTrue(result.value.isEmpty()) + assertTrue(result.duration < 500.milliseconds) + } + + @Test + fun testMultipleElementsFillingBufferWithTimeOrSizeBasedChunking() = runTest { + val flow = flow { + for (i in 1..10) { + emit(i) + } + } + val result = measureTimedValue { + flow.chunked(ChunkingMethod.ByTimeOrSize(intervalMs = 10 * 1000, maxSize = 5)).toList() + } + assertEquals(2, result.value.size) + assertEquals(5, result.value.first().size) + assertEquals(5, result.value[1].size) + assertTrue(result.duration < 500.milliseconds) + } + + @Test + fun testMultipleElementsNotFillingBufferWithTimeOrSizeBasedChunking() = runTest { + val flow = flow { + for (i in 1..10) { + delay(10) + emit(i) + } + } + val result = measureTimedValue { + flow.chunked(ChunkingMethod.ByTimeOrSize(intervalMs = 55, maxSize = 500)).toList() + } + + assertEquals(2, result.value.size) + assertEquals(5, result.value.first().size) + assertEquals(5, result.value[1].size) + assertTrue(result.duration >= 100.milliseconds) + } + // @Test // fun testEmptyFlowChunking() = runTest { // val emptyFlow = emptyFlow() From 2b9e5d1bcb1a5e3fc9598a64b72a6ab2658967bf Mon Sep 17 00:00:00 2001 From: Lukasz Wojtach Date: Tue, 13 Apr 2021 00:39:17 +0200 Subject: [PATCH 14/18] Add docs and last tests --- .../common/src/flow/operators/Chunk.kt | 151 ++++++++++-------- .../common/test/flow/operators/ChunkedTest.kt | 93 ++--------- 2 files changed, 101 insertions(+), 143 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Chunk.kt b/kotlinx-coroutines-core/common/src/flow/operators/Chunk.kt index 3636cc3cd1..aa75e65e73 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Chunk.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Chunk.kt @@ -14,25 +14,88 @@ import kotlinx.coroutines.selects.* import kotlin.jvm.* import kotlin.time.* +/** + * Groups emissions from this Flow into lists, according to the chosen ChunkingMethod. Time based implementations + * collect upstream and emit to downstream in separate coroutines - concurrently, like Flow.buffer() operator. + * Exact timing of emissions is not guaranteed, as it depends on collector coroutine availability. + * + * Size based chunking happens in a single coroutine and is purely sequential. + * + * Emissions always preserve order. + * + * It is possible to pass custom implementation of ChunkingMethod to chunked() operator. + * + * @param method Defines constrains on chunk size and time of its emission. + */ + +@ExperimentalCoroutinesApi public fun Flow.chunked(method: ChunkingMethod): Flow> = with(method) { chunk() } +@ExperimentalCoroutinesApi public interface ChunkingMethod { public fun Flow.chunk(): Flow> public companion object { + + /** + * Collects upstream and emits to downstream in separate coroutines - as soon as possible. If consumer keeps + * up with the producer, it emits lists with single element. + * + * In case of slow consumer, it groups emissions into bigger lists. When consumer "speeds up", chunks + * will get smaller. + * + * @param maxSize Maximum size of a single chunk. If reached, producer gets suspended until consumer "consumes" + * a chunk. If maxSize is not specified, then chunk may grow indefinitely until jvm runs out of memory. + */ + @Suppress("FunctionName") public fun Natural(maxSize: Int = Int.MAX_VALUE): ChunkingMethod = NaturalChunking(maxSize) + /** + * Collects upstream into a buffer and emits its content as a list at every interval. When upstream completes + * (or is empty), it will try to emit immediately what is left of a chunk, omitting the interval. + * + * @param intervalMs Interval between emissions in milliseconds. Every emission happens only after + * interval passes, unless upstream Flow completes sooner. + * + * @param maxSize Maximum size of a single chunk. If reached, producer gets suspended until consumer "consumes" + * a chunk. If maxSize is not specified, then chunk may grow indefinitely until jvm runs out of memory. + */ + @Suppress("FunctionName") public fun ByTime(intervalMs: Long, maxSize: Int = Int.MAX_VALUE): ChunkingMethod = TimeBased(intervalMs, maxSize) - public fun BySize(size: Int): ChunkingMethod = SizeBased(size) - + /** + * Collects upstream into a buffer and emits its content as a list at every interval or when its buffer reaches + * maximum size. When upstream completes (or is empty), it will try to emit immediately what is left of + * a chunk, omitting the interval and maxSize constraints. + * + * @param intervalMs Interval between emissions in milliseconds. Every emission happens only after + * interval passes, unless upstream Flow completes sooner or maximum size of a chunk is reached. + * + * @param maxSize Maximum size of a single chunk. If reached, it will try to emit a chunk, ignoring the + * interval constraint. If so happens, time-to-next-chunk gets reset to the interval value. + */ + @Suppress("FunctionName") public fun ByTimeOrSize(intervalMs: Long, maxSize: Int): ChunkingMethod = TimeOrSizeBased(intervalMs, maxSize) + + /** + * Collects upstream into a buffer and emits its content as a list, when specified size is reached. + * This implementation is purely sequential. If concurrent upstream collection and downstream emissions are + * desired, one can use a buffer() operator after chunking + * + * @param size Exact size of emitted chunks. Only the last emission may be smaller. + */ + @Suppress("FunctionName") + public fun BySize(size: Int): ChunkingMethod = SizeBased(size) } } private class NaturalChunking(private val maxSize: Int) : ChunkingMethod { + init { + requirePositive(maxSize) + } + override fun Flow.chunk(): Flow> = scopedFlow { downstream -> val upstream = buffer(maxSize).produceIn(this) @@ -45,6 +108,11 @@ private class NaturalChunking(private val maxSize: Int) : ChunkingMethod { private class TimeBased(private val intervalMs: Long, private val maxSize: Int) : ChunkingMethod { + init { + requirePositive(intervalMs) + requirePositive(maxSize) + } + override fun Flow.chunk(): Flow> = scopedFlow { downstream -> val upstreamCollection = Job() val upstream = produce(capacity = maxSize) { @@ -60,7 +128,7 @@ private class TimeBased(private val intervalMs: Long, private val maxSize: Int) } onTimeout(intervalMs) { - val chunk = upstream.awaitFirstAndDrain(maxElements = maxSize) + val chunk = upstream.drain(maxElements = maxSize) if (chunk.isNotEmpty()) downstream.emit(chunk) true } @@ -70,7 +138,11 @@ private class TimeBased(private val intervalMs: Long, private val maxSize: Int) private class SizeBased(private val size: Int) : ChunkingMethod { - override fun Flow.chunk(): Flow> = flow> { + init { + requirePositive(size) + } + + override fun Flow.chunk(): Flow> = flow { val accumulator = ArrayList(size) collect { element -> accumulator.add(element) @@ -82,8 +154,13 @@ private class SizeBased(private val size: Int) : ChunkingMethod { private class TimeOrSizeBased(private val intervalMs: Long, private val maxSize: Int) : ChunkingMethod { + init { + requirePositive(intervalMs) + requirePositive(maxSize) + } + override fun Flow.chunk(): Flow> = scopedFlow { downstream -> - val emitNowAndMaybeContinue = Channel() + val emitNowAndMaybeContinue = Channel(capacity = Channel.RENDEZVOUS) val elements = produce(capacity = maxSize) { collect { element -> val hasCapacity = channel.offer(element) @@ -103,7 +180,7 @@ private class TimeOrSizeBased(private val intervalMs: Long, private val maxSize: } onTimeout(intervalMs) { - val chunk = elements.awaitFirstAndDrain(maxElements = maxSize) + val chunk = elements.drain(maxElements = maxSize) if (chunk.isNotEmpty()) downstream.emit(chunk) true } @@ -112,56 +189,6 @@ private class TimeOrSizeBased(private val intervalMs: Long, private val maxSize: } -public object ChunkConstraint { - public const val NO_MAXIMUM: Int = Int.MAX_VALUE - public const val NO_INTERVAL: Long = Long.MAX_VALUE - public const val NATURAL_BATCHING: Long = 0 -} - -@ExperimentalTime -public fun Flow.chunkedOld( - interval: Duration, - size: Int -): Flow> = chunkedOld(interval.toLongMilliseconds(), size) - -public fun Flow.chunkedOld( - intervalMs: Long, - size: Int -): Flow> { - require(intervalMs >= 0) - require(size > 0) - - return chunkedTimeBased(intervalMs, size) -} - -private fun Flow.chunkedTimeBased(intervalMs: Long, size: Int): Flow> = scopedFlow { downstream -> - val buffer = Channel(size) - val emitNowSemaphore = Channel() - - launch { - collect { value -> - val hasCapacity = buffer.offer(value) - if (!hasCapacity) { - emitNowSemaphore.send(Unit) - buffer.send(value) - } - } - emitNowSemaphore.close() - buffer.close() - } - - whileSelect { - emitNowSemaphore.onReceiveOrClosed { valueOrClosed -> - buffer.drain(maxElements = size).takeIf { it.isNotEmpty() }?.let { downstream.emit(it) } - valueOrClosed.isClosed.not() - } - onTimeout(intervalMs) { - downstream.emit(buffer.awaitFirstAndDrain(maxElements = size)) - true - } - } -} - private suspend fun ReceiveChannel.awaitFirstAndDrain(maxElements: Int): List { val first = receiveOrClosed().takeIf { it.isClosed.not() }?.value ?: return emptyList() return drain(mutableListOf(first), maxElements) @@ -178,13 +205,7 @@ private tailrec fun ReceiveChannel.drain(acc: MutableList = mutableLis } } -private fun Flow.chunkedSizeBased(size: Int): Flow> = flow { - val buffer = mutableListOf() - collect { value -> - buffer.add(value) - if (buffer.size == size) emit(buffer.drain()) - } - if (buffer.isNotEmpty()) emit(buffer) -} +private fun MutableList.drain() = toList().also { this.clear() } -private fun MutableList.drain() = toList().also { this.clear() } \ No newline at end of file +private fun requirePositive(size: Int) = require(size > 0) +private fun requirePositive(intervalMs: Long) = require(intervalMs > 0) \ No newline at end of file diff --git a/kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt index de9440f108..8b93fc3a2d 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt @@ -20,10 +20,10 @@ class ChunkedTest : TestBase() { @Test fun testUndersizedFlowSizeBasedChunking() = runTest { - val undersizeFlow = flow { + val undersizedFlow = flow { for (i in 1..3) emit(i) } - val result = undersizeFlow.chunked(ChunkingMethod.BySize(5)).toList() + val result = undersizedFlow.chunked(ChunkingMethod.BySize(5)).toList() assertEquals(1, result.size) assertEquals(listOf(1, 2, 3), result.first()) } @@ -107,7 +107,7 @@ class ChunkedTest : TestBase() { val emptyFlow = emptyFlow() val result = measureTimedValue { emptyFlow.chunked(ChunkingMethod.ByTime(intervalMs = 10 * 1000)).toList() } assertTrue(result.value.isEmpty()) - assertTrue(result.duration < 1000.milliseconds) + assertTrue(result.duration < 500.milliseconds) } @Test @@ -165,7 +165,7 @@ class ChunkedTest : TestBase() { assertEquals(2, result.value.size) assertEquals(5, result.value.first().size) assertEquals(5, result.value[1].size) - assertTrue(result.duration >= 200.milliseconds, "expected time at least 400 ms but was: ${result.duration}") + assertTrue(result.duration >= 200.milliseconds, "expected time at least 200 ms but was: ${result.duration}") } @Test @@ -195,83 +195,20 @@ class ChunkedTest : TestBase() { } @Test - fun testMultipleElementsNotFillingBufferWithTimeOrSizeBasedChunking() = runTest { - val flow = flow { - for (i in 1..10) { - delay(10) + fun testMultipleElementsNotFillingBufferWithTimeOrSizeBasedChunking() = withVirtualTime { + val flow = flow { + for (i in 1..5) { + delay(500) emit(i) } } - val result = measureTimedValue { - flow.chunked(ChunkingMethod.ByTimeOrSize(intervalMs = 55, maxSize = 500)).toList() - } + val result = flow.chunked(ChunkingMethod.ByTimeOrSize(intervalMs = 1100, maxSize = 500)).toList() - assertEquals(2, result.value.size) - assertEquals(5, result.value.first().size) - assertEquals(5, result.value[1].size) - assertTrue(result.duration >= 100.milliseconds) - } - -// @Test -// fun testEmptyFlowChunking() = runTest { -// val emptyFlow = emptyFlow() -// val result = measureTimedValue { -// emptyFlow.chunked(10.seconds, ChunkConstraint.NO_MAXIMUM).toList() -// } -// -// assertTrue { result.value.isEmpty() } -// assertTrue { result.duration.inSeconds < 1 } -// } -// -// @ExperimentalTime -// @Test -// fun testSingleFastElementChunking() = runTest { -// val fastFlow = flow { emit(1) } -// -// val result = measureTimedValue { -// fastFlow.chunked(10.seconds, ChunkConstraint.NO_MAXIMUM).toList() -// } -// -// assertTrue { result.value.size == 1 && result.value.first().contains(1) } -// assertTrue { result.duration.inSeconds < 1 } -// } -// -// @ExperimentalTime -// @Test -// fun testMultipleFastElementsChunking() = runTest { -// val fastFlow = flow { -// for(i in 1..1000) emit(1) -// } -// -// val result = measureTimedValue { -// fastFlow.chunked(10.seconds, ChunkConstraint.NO_MAXIMUM).toList() -// } -// -// assertTrue { result.value.size == 1 && result.value.first().size == 1000 } -// assertTrue { result.duration.inSeconds < 1 } -// } -// -// @Test -// fun testRespectingSizeAndTimeLimit() = withVirtualTime { -// val intervalFlow = flow { -// delay(1500) -// emit(1) -// emit(2) -// emit(3) -// emit(4) -// delay(1500) -// emit(5) -// delay(1500) -// emit(6) -// } -// val chunks = intervalFlow.chunked(2.seconds, size = 3).toList() -// -// assertEquals(3, chunks.size) -// assertEquals(3, chunks.first().size) -// assertEquals(2, chunks[1].size) -// assertTrue { chunks[1].containsAll(listOf(4, 5)) } -// -// finish(1) -// } + assertEquals(3, result.size) + assertEquals(2, result.first().size) + assertEquals(2, result[1].size) + assertEquals(1, result[2].size) + finish(1) + } } \ No newline at end of file From 9cb86f9138a6299c1861f1c6fbe9b21a4714da8c Mon Sep 17 00:00:00 2001 From: Lukasz Wojtach Date: Wed, 14 Apr 2021 23:00:05 +0200 Subject: [PATCH 15/18] Add test for error propagation in Natural Chunking --- .../common/src/flow/operators/Chunk.kt | 1 + .../common/test/flow/operators/ChunkedTest.kt | 22 +++++++++++++++++++ 2 files changed, 23 insertions(+) diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Chunk.kt b/kotlinx-coroutines-core/common/src/flow/operators/Chunk.kt index aa75e65e73..aad9fdf339 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Chunk.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Chunk.kt @@ -208,4 +208,5 @@ private tailrec fun ReceiveChannel.drain(acc: MutableList = mutableLis private fun MutableList.drain() = toList().also { this.clear() } private fun requirePositive(size: Int) = require(size > 0) + private fun requirePositive(intervalMs: Long) = require(intervalMs > 0) \ No newline at end of file diff --git a/kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt index 8b93fc3a2d..1b70eee4be 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt @@ -102,6 +102,28 @@ class ChunkedTest : TestBase() { for (i in 1..2) assertEquals(2, result[i].value.size) } + @Test + fun testErrorPropagationInNaturalChunking() = runTest { + val exception = IllegalArgumentException() + val failedFlow = flow { + emit(1) + emit(2) + throw exception + } + var catchedException: Throwable? = null + + val result = failedFlow + .chunked(ChunkingMethod.Natural()) + .catch { e -> + catchedException = e + emit(listOf(3)) + } + .toList() + + assertTrue(catchedException is IllegalArgumentException) + assertEquals(3, result.first().single()) + } + @Test fun testEmptyFlowWithSlowTimeBasedChunking() = runTest { val emptyFlow = emptyFlow() From d996a9b8ec72f81e9405465e9b54bcc4a7ed43c7 Mon Sep 17 00:00:00 2001 From: Lukasz Wojtach Date: Wed, 14 Apr 2021 23:03:22 +0200 Subject: [PATCH 16/18] Enable for JDK 1.6 --- kotlinx-coroutines-core/build.gradle | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/kotlinx-coroutines-core/build.gradle b/kotlinx-coroutines-core/build.gradle index e34d540217..c2a57f9d9e 100644 --- a/kotlinx-coroutines-core/build.gradle +++ b/kotlinx-coroutines-core/build.gradle @@ -166,11 +166,11 @@ kotlin.sourceSets { task checkJdk16() { // only fail w/o JDK_16 when actually trying to compile, not during project setup phase doLast { -// if (!System.env.JDK_16) { -// throw new GradleException("JDK_16 environment variable is not defined. " + -// "Can't build against JDK 1.6 runtime and run JDK 1.6 compatibility tests. " + -// "Please ensure JDK 1.6 is installed and that JDK_16 points to it.") -// } + if (!System.env.JDK_16) { + throw new GradleException("JDK_16 environment variable is not defined. " + + "Can't build against JDK 1.6 runtime and run JDK 1.6 compatibility tests. " + + "Please ensure JDK 1.6 is installed and that JDK_16 points to it.") + } } } From 3fb6939374937a11443cab499e955d1ff4699196 Mon Sep 17 00:00:00 2001 From: Lukasz Wojtach Date: Thu, 15 Apr 2021 22:44:18 +0200 Subject: [PATCH 17/18] Adjust for changes in Channel API --- .../common/src/flow/operators/Chunk.kt | 21 +++-- .../common/src/flow/operators/Chunked.kt | 90 ------------------- 2 files changed, 10 insertions(+), 101 deletions(-) delete mode 100644 kotlinx-coroutines-core/common/src/flow/operators/Chunked.kt diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Chunk.kt b/kotlinx-coroutines-core/common/src/flow/operators/Chunk.kt index aad9fdf339..b0a326391c 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Chunk.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Chunk.kt @@ -12,7 +12,6 @@ import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.internal.* import kotlinx.coroutines.selects.* import kotlin.jvm.* -import kotlin.time.* /** * Groups emissions from this Flow into lists, according to the chosen ChunkingMethod. Time based implementations @@ -163,7 +162,7 @@ private class TimeOrSizeBased(private val intervalMs: Long, private val maxSize: val emitNowAndMaybeContinue = Channel(capacity = Channel.RENDEZVOUS) val elements = produce(capacity = maxSize) { collect { element -> - val hasCapacity = channel.offer(element) + val hasCapacity = channel.trySend(element).isSuccess if (!hasCapacity) { emitNowAndMaybeContinue.send(true) channel.send(element) @@ -189,20 +188,20 @@ private class TimeOrSizeBased(private val intervalMs: Long, private val maxSize: } -private suspend fun ReceiveChannel.awaitFirstAndDrain(maxElements: Int): List { - val first = receiveOrClosed().takeIf { it.isClosed.not() }?.value ?: return emptyList() - return drain(mutableListOf(first), maxElements) +private suspend fun ReceiveChannel.awaitFirstAndDrain(maxElements: Int): List = try { + val first = receive() + drain(mutableListOf(first), maxElements) +} catch (e: ClosedReceiveChannelException) { + emptyList() } + private tailrec fun ReceiveChannel.drain(acc: MutableList = mutableListOf(), maxElements: Int): List = if (acc.size == maxElements) acc else { - val item = poll() - if (item == null) acc - else { - acc.add(item) - drain(acc, maxElements) - } + val nextValue = tryReceive().getOrElse { error: Throwable? -> error?.let { throw(it) } ?: return acc } + acc.add(nextValue) + drain(acc, maxElements) } private fun MutableList.drain() = toList().also { this.clear() } diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Chunked.kt b/kotlinx-coroutines-core/common/src/flow/operators/Chunked.kt deleted file mode 100644 index f94dedfb63..0000000000 --- a/kotlinx-coroutines-core/common/src/flow/operators/Chunked.kt +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -@file:JvmMultifileClass -@file:JvmName("FlowKt") - -package kotlinx.coroutines.flow - -import kotlinx.coroutines.* -import kotlinx.coroutines.channels.* -import kotlinx.coroutines.flow.internal.* -import kotlinx.coroutines.selects.* -import kotlin.jvm.* -import kotlin.time.* - -public object ChunkConstraint { - public const val NO_MAXIMUM: Int = Int.MAX_VALUE - public const val NO_INTERVAL: Long = Long.MAX_VALUE - public const val NATURAL_BATCHING: Long = 0 -} - -@ExperimentalTime -public fun Flow.chunked( - interval: Duration, - size: Int -): Flow> = chunked(interval.toLongMilliseconds(), size) - -public fun Flow.chunked( - intervalMs: Long, - size: Int -): Flow> { - require(intervalMs >= 0) - require(size > 0) - - return chunkedTimeBased(intervalMs, size) -} - -private fun Flow.chunkedTimeBased(intervalMs: Long, size: Int): Flow> = scopedFlow { downstream -> - val buffer = Channel(size) - val emitNowSemaphore = Channel() - - launch { - collect { value -> - val hasCapacity = buffer.offer(value) - if (!hasCapacity) { - emitNowSemaphore.send(Unit) - buffer.send(value) - } - } - emitNowSemaphore.close() - buffer.close() - } - - whileSelect { - emitNowSemaphore.onReceiveOrClosed { valueOrClosed -> - buffer.drain(maxElements = size).takeIf { it.isNotEmpty() }?.let { downstream.emit(it) } - valueOrClosed.isClosed.not() - } - onTimeout(intervalMs) { - downstream.emit(buffer.awaitFirstAndDrain(maxElements = size)) - true - } - } -} - -private suspend fun ReceiveChannel.awaitFirstAndDrain(maxElements: Int): List { - val first = receiveOrClosed().takeIf { it.isClosed.not() }?.value ?: return emptyList() - return drain(mutableListOf(first), maxElements) -} - -private tailrec fun ReceiveChannel.drain(acc: MutableList = mutableListOf(), maxElements: Int): List { - val item = poll() - return if (item == null || acc.size == maxElements) acc - else { - acc.add(item) - drain(acc, maxElements) - } -} - -private fun Flow.chunkedSizeBased(size: Int): Flow> = flow { - val buffer = mutableListOf() - collect { value -> - buffer.add(value) - if (buffer.size == size) emit(buffer.drain()) - } - if (buffer.isNotEmpty()) emit(buffer) -} - -private fun MutableList.drain() = toList().also { this.clear() } \ No newline at end of file From 7431426d281ba06526e88b5df4376c1e06f90f91 Mon Sep 17 00:00:00 2001 From: Lukasz Wojtach Date: Thu, 15 Apr 2021 23:13:29 +0200 Subject: [PATCH 18/18] New Api dump --- .../api/kotlinx-coroutines-core.api | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api index 23f29b02a3..ce5224dd85 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api @@ -860,6 +860,20 @@ public abstract class kotlinx/coroutines/flow/AbstractFlow : kotlinx/coroutines/ public abstract fun collectSafely (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } +public abstract interface class kotlinx/coroutines/flow/ChunkingMethod { + public static final field Companion Lkotlinx/coroutines/flow/ChunkingMethod$Companion; + public abstract fun chunk (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow; +} + +public final class kotlinx/coroutines/flow/ChunkingMethod$Companion { + public final fun BySize (I)Lkotlinx/coroutines/flow/ChunkingMethod; + public final fun ByTime (JI)Lkotlinx/coroutines/flow/ChunkingMethod; + public static synthetic fun ByTime$default (Lkotlinx/coroutines/flow/ChunkingMethod$Companion;JIILjava/lang/Object;)Lkotlinx/coroutines/flow/ChunkingMethod; + public final fun ByTimeOrSize (JI)Lkotlinx/coroutines/flow/ChunkingMethod; + public final fun Natural (I)Lkotlinx/coroutines/flow/ChunkingMethod; + public static synthetic fun Natural$default (Lkotlinx/coroutines/flow/ChunkingMethod$Companion;IILjava/lang/Object;)Lkotlinx/coroutines/flow/ChunkingMethod; +} + public abstract interface class kotlinx/coroutines/flow/Flow { public abstract fun collect (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } @@ -894,6 +908,7 @@ public final class kotlinx/coroutines/flow/FlowKt { public static final fun cancellable (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow; public static final fun catch (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; public static final fun channelFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; + public static final fun chunked (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/ChunkingMethod;)Lkotlinx/coroutines/flow/Flow; public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun collectIndexed (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;