Skip to content

Commit 9cb51b2

Browse files
committed
Add Flow.concatMapEager
1 parent b40f956 commit 9cb51b2

File tree

6 files changed

+223
-2
lines changed

6 files changed

+223
-2
lines changed

README.md

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ Extensions to the Kotlin Flow library.
1111

1212
```groovy
1313
dependencies {
14-
implementation "com.github.akarnokd:kotlin-flow-extensions:0.0.9"
14+
implementation "com.github.akarnokd:kotlin-flow-extensions:0.0.10"
1515
}
1616
```
1717

@@ -39,6 +39,7 @@ Table of contents
3939
- `Flow.takeUntil`
4040
- `Flow.onBackpressureDrop`
4141
- [`Flow.flatMapDrop`](#flowflatmapdrop)
42+
- [`Flow.concatMapEager`](#flowconcatmapeager)
4243
- `ParallelFlow` operators (`FlowExtensions`)
4344
- `ParallelFlow.concatMap`
4445
- `ParallelFlow.filter`
@@ -280,4 +281,31 @@ concatArrayEager(
280281
range(6, 5)
281282
)
282283
.assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
284+
```
285+
286+
## Flow.concatMapEager
287+
288+
Maps the upstream values into [Flow]s and launches them all at once, then
289+
emits items from a source before items of the next are emitted.
290+
291+
For example, given two inner sources, if the first is slow, the items of the second won't be emitted until the first has
292+
finished emitting its items. This operators allows all sources to generate items in parallel but then still emit those
293+
items in the order their respective `Flow`s are mapped in.
294+
295+
Note that the upstream and each source is consumed in an unbounded manner and thus,
296+
depending on the speed of the current source and the collector, the operator may retain
297+
items longer and may use more memory during its execution.
298+
299+
```kotlin
300+
range(1, 5)
301+
.concatMapEager {
302+
range(it * 10, 5).onEach { delay(100) }
303+
}
304+
.assertResult(
305+
10, 11, 12, 13, 14,
306+
20, 21, 22, 23, 24,
307+
30, 31, 32, 33, 34,
308+
40, 41, 42, 43, 44,
309+
50, 51, 52, 53, 54
310+
)
283311
```

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
GROUP=com.github.akarnokd
2-
VERSION_NAME=0.0.9
2+
VERSION_NAME=0.0.10
33

44
POM_ARTIFACT_ID=kotlin-flow-extensions
55
POM_NAME=Kotlin Flow Extensions

src/main/kotlin/hu/akarnokd/kotlin/flow/FlowExtensions.kt

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,17 @@ fun <T> mergeArray(vararg sources: Flow<T>) : Flow<T> = FlowMergeArray(sources)
220220
@FlowPreview
221221
fun <T> concatArrayEager(vararg sources: Flow<T>) : Flow<T> = FlowConcatArrayEager(sources)
222222

223+
/**
224+
* Maps the upstream values into [Flow]s and launches them all at once, then
225+
* emits items from a source before items of the next are emitted.
226+
* Note that the upstream and each source is consumed in an unbounded manner and thus,
227+
* depending on the speed of the current source and the collector, the operator may retain
228+
* items longer and may use more memory during its execution.
229+
* @param mapper the suspendable function to turn an upstream item into a [Flow]
230+
*/
231+
@FlowPreview
232+
fun <T, R> Flow<T>.concatMapEager(mapper: suspend (T) -> Flow<R>) : Flow<R> = FlowConcatMapEager(this, mapper)
233+
223234
// -----------------------------------------------------------------------------------------
224235
// Parallel Extensions
225236
// -----------------------------------------------------------------------------------------
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Copyright 2019-2020 David Karnok
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package hu.akarnokd.kotlin.flow.impl
18+
19+
import hu.akarnokd.kotlin.flow.Resumable
20+
import kotlinx.coroutines.FlowPreview
21+
import kotlinx.coroutines.coroutineScope
22+
import kotlinx.coroutines.flow.AbstractFlow
23+
import kotlinx.coroutines.flow.Flow
24+
import kotlinx.coroutines.flow.FlowCollector
25+
import kotlinx.coroutines.flow.collect
26+
import kotlinx.coroutines.isActive
27+
import kotlinx.coroutines.launch
28+
import java.util.concurrent.ConcurrentLinkedQueue
29+
import java.util.concurrent.atomic.AtomicBoolean
30+
31+
@FlowPreview
32+
class FlowConcatMapEager<T, R>(private val source: Flow<T>, private val mapper : suspend (t: T) -> Flow<R>) : AbstractFlow<R>() {
33+
override suspend fun collectSafely(collector: FlowCollector<R>) {
34+
coroutineScope {
35+
val resumeOutput = Resumable()
36+
val innerQueue = ConcurrentLinkedQueue<InnerQueue<R>>()
37+
val innerDone = AtomicBoolean()
38+
39+
launch {
40+
try {
41+
source.collect {
42+
val f = mapper(it);
43+
val iq = InnerQueue<R>()
44+
innerQueue.offer(iq);
45+
resumeOutput.resume()
46+
launch {
47+
try {
48+
f.collect {
49+
iq.queue.offer(it)
50+
resumeOutput.resume()
51+
}
52+
} finally {
53+
iq.done.set(true)
54+
resumeOutput.resume()
55+
}
56+
}
57+
}
58+
} finally {
59+
innerDone.set(true)
60+
resumeOutput.resume()
61+
}
62+
}
63+
64+
var iq : InnerQueue<R>? = null
65+
66+
while (isActive) {
67+
68+
if (iq == null) {
69+
val id = innerDone.get()
70+
iq = innerQueue.poll()
71+
72+
if (id && iq == null) {
73+
break
74+
}
75+
}
76+
77+
if (iq != null) {
78+
val d = iq.done.get()
79+
val v = iq.queue.poll()
80+
81+
if (d && v == null) {
82+
iq = null
83+
continue
84+
}
85+
86+
if (v != null) {
87+
collector.emit(v)
88+
continue
89+
}
90+
}
91+
resumeOutput.await()
92+
}
93+
}
94+
}
95+
96+
class InnerQueue<R> {
97+
val queue = ConcurrentLinkedQueue<R>()
98+
val done = AtomicBoolean()
99+
}
100+
}

src/test/kotlin/hu/akarnokd/kotlin/flow/impl/FlowConcatArrayEagerTest.kt

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,4 +51,20 @@ class FlowConcatArrayEagerTest {
5151
.take(6)
5252
.assertResult(1, 2, 3, 4, 5, 6)
5353
}
54+
55+
@Test
56+
fun cancel() = runBlocking() {
57+
var counter = AtomicInteger()
58+
concatArrayEager(
59+
range(1, 5).onEach {
60+
delay(200)
61+
counter.getAndIncrement()
62+
}
63+
)
64+
.take(3)
65+
.assertResult(1, 2, 3)
66+
67+
delay(1200)
68+
assertEquals(3, counter.get())
69+
}
5470
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package hu.akarnokd.kotlin.flow.impl
2+
3+
import hu.akarnokd.kotlin.flow.*
4+
import kotlinx.coroutines.FlowPreview
5+
import kotlinx.coroutines.delay
6+
import kotlinx.coroutines.flow.flow
7+
import kotlinx.coroutines.flow.onEach
8+
import kotlinx.coroutines.flow.onStart
9+
import kotlinx.coroutines.flow.take
10+
import kotlinx.coroutines.runBlocking
11+
import org.junit.Test
12+
import java.lang.IllegalArgumentException
13+
import java.lang.RuntimeException
14+
import java.util.concurrent.atomic.AtomicInteger
15+
import kotlin.test.assertEquals
16+
17+
@FlowPreview
18+
class FlowConcatMapEagerTest {
19+
@Test
20+
fun basic() = runBlocking {
21+
range(1, 5)
22+
.concatMapEager {
23+
range(it * 10, 5).onEach { delay(100) }
24+
}
25+
.assertResult(
26+
10, 11, 12, 13, 14,
27+
20, 21, 22, 23, 24,
28+
30, 31, 32, 33, 34,
29+
40, 41, 42, 43, 44,
30+
50, 51, 52, 53, 54
31+
)
32+
}
33+
34+
@Test
35+
fun take() = runBlocking {
36+
range(1, 5)
37+
.concatMapEager {
38+
range(it * 10, 5).onEach { delay(100) }
39+
}
40+
.take(7)
41+
.assertResult(
42+
10, 11, 12, 13, 14,
43+
20, 21
44+
)
45+
}
46+
47+
@Test
48+
fun crashMapper() = runBlocking {
49+
range(1, 5)
50+
.concatMapEager<Int, Int> {
51+
throw IllegalArgumentException()
52+
}
53+
.assertFailure<Int, IllegalArgumentException>(IllegalArgumentException::class.java)
54+
}
55+
56+
@Test
57+
fun crashInner() = runBlocking {
58+
range(1, 1)
59+
.concatMapEager<Int, Int> {
60+
flow {
61+
throw IllegalArgumentException()
62+
}
63+
}
64+
.assertFailure<Int, IllegalArgumentException>(IllegalArgumentException::class.java)
65+
}
66+
}

0 commit comments

Comments
 (0)