Skip to content

Commit cfdd0e0

Browse files
committed
Add Flow.amb
1 parent e115e95 commit cfdd0e0

File tree

4 files changed

+136
-0
lines changed

4 files changed

+136
-0
lines changed

README.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ Table of contents
4040
- `Flow.onBackpressureDrop`
4141
- [`Flow.flatMapDrop`](#flowflatmapdrop)
4242
- [`Flow.concatMapEager`](#flowconcatmapeager)
43+
- [`Flow.amb`](#flowamb)
4344
- `ParallelFlow` operators (`FlowExtensions`)
4445
- `ParallelFlow.concatMap`
4546
- `ParallelFlow.filter`
@@ -308,4 +309,17 @@ range(1, 5)
308309
40, 41, 42, 43, 44,
309310
50, 51, 52, 53, 54
310311
)
312+
```
313+
314+
## Flow.amb
315+
316+
Starts collecting all source [Flow]s and relays the items of the first one to emit an item,
317+
cancelling the rest.
318+
319+
```kotlin
320+
amb(
321+
range(1, 5).onStart { delay(1000) },
322+
range(6, 5).onStart { delay(100) }
323+
)
324+
.assertResult(6, 7, 8, 9, 10)
311325
```

src/main/kotlin/hu/akarnokd/kotlin/flow/FlowExtensions.kt

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ import kotlinx.coroutines.ExperimentalCoroutinesApi
2222
import kotlinx.coroutines.FlowPreview
2323
import kotlinx.coroutines.delay
2424
import kotlinx.coroutines.flow.*
25+
import java.util.*
2526
import java.util.concurrent.TimeUnit
27+
import kotlin.collections.ArrayList
2628

2729
/**
2830
* Shares a single collector towards the upstream source and multicasts
@@ -231,6 +233,22 @@ fun <T> concatArrayEager(vararg sources: Flow<T>) : Flow<T> = FlowConcatArrayEag
231233
@FlowPreview
232234
fun <T, R> Flow<T>.concatMapEager(mapper: suspend (T) -> Flow<R>) : Flow<R> = FlowConcatMapEager(this, mapper)
233235

236+
/**
237+
* Starts collecting all source [Flow]s and relays the items of the first one to emit an item,
238+
* cancelling the rest.
239+
* @param sources the [Iterable] sequence of [Flow]s
240+
*/
241+
@FlowPreview
242+
fun <T> amb(sources: Iterable<Flow<T>>) : Flow<T> = FlowAmbIterable(sources)
243+
244+
/**
245+
* Starts collecting all source [Flow]s and relays the items of the first one to emit an item,
246+
* cancelling the rest.
247+
* @param sources the array of [Flow]s
248+
*/
249+
@FlowPreview
250+
fun <T> amb(vararg sources: Flow<T>) : Flow<T> = FlowAmbIterable(listOf(*sources))
251+
234252
// -----------------------------------------------------------------------------------------
235253
// Parallel Extensions
236254
// -----------------------------------------------------------------------------------------
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package hu.akarnokd.kotlin.flow.impl
2+
3+
import kotlinx.coroutines.*
4+
import kotlinx.coroutines.flow.Flow
5+
import kotlinx.coroutines.flow.FlowCollector
6+
import kotlinx.coroutines.flow.collect
7+
import java.util.concurrent.ConcurrentHashMap
8+
import java.util.concurrent.atomic.AtomicInteger
9+
10+
@FlowPreview
11+
class FlowAmbIterable<T>(private val sources: Iterable<Flow<T>>) : Flow<T> {
12+
@InternalCoroutinesApi
13+
override suspend fun collect(collector: FlowCollector<T>) {
14+
val winner = AtomicInteger()
15+
val jobs = ConcurrentHashMap<Job, Int>()
16+
coroutineScope {
17+
var i = 1
18+
for (source in sources) {
19+
val idx = i
20+
val job = launch {
21+
source.collect {
22+
val w = winner.get()
23+
if (w == idx) {
24+
collector.emit(it)
25+
} else if (w == 0 && winner.compareAndSet(0, idx)) {
26+
for (j in jobs.entries) {
27+
if (j.value != idx) {
28+
j.key.cancel()
29+
}
30+
}
31+
32+
collector.emit(it)
33+
} else {
34+
throw CancellationException()
35+
}
36+
}
37+
}
38+
39+
jobs[job] = i
40+
val w = winner.get()
41+
if (w != 0 && w != i) {
42+
jobs.remove(job)
43+
job.cancel()
44+
break
45+
}
46+
47+
i++
48+
}
49+
}
50+
}
51+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package hu.akarnokd.kotlin.flow.impl
2+
3+
import hu.akarnokd.kotlin.flow.amb
4+
import hu.akarnokd.kotlin.flow.assertResult
5+
import hu.akarnokd.kotlin.flow.concatArrayEager
6+
import hu.akarnokd.kotlin.flow.range
7+
import kotlinx.coroutines.FlowPreview
8+
import kotlinx.coroutines.delay
9+
import kotlinx.coroutines.flow.*
10+
import kotlinx.coroutines.runBlocking
11+
import org.junit.Test
12+
import java.util.concurrent.atomic.AtomicInteger
13+
import kotlin.test.assertEquals
14+
15+
@FlowPreview
16+
class FlowAmbIterableTest {
17+
@Test
18+
fun basic1() = runBlocking {
19+
amb(
20+
range(1, 5).onStart { delay(1000) },
21+
range(6, 5).onStart { delay(100) }
22+
)
23+
.assertResult(6, 7, 8, 9, 10)
24+
}
25+
26+
@Test
27+
fun basic2() = runBlocking {
28+
amb(
29+
range(1, 5).onStart { delay(100) },
30+
range(6, 5).onStart { delay(1000) }
31+
)
32+
.assertResult(1, 2, 3, 4, 5)
33+
}
34+
35+
@Test
36+
fun basic3() = runBlocking {
37+
val counter = AtomicInteger()
38+
amb(
39+
range(1, 5).onEach { delay(100) },
40+
range(6, 5)
41+
.onEach {
42+
delay(200)
43+
counter.getAndIncrement()
44+
}
45+
46+
)
47+
.take(3)
48+
.assertResult(1, 2, 3)
49+
50+
assertEquals(0, counter.get())
51+
}
52+
53+
}

0 commit comments

Comments
 (0)