Skip to content

Commit 60e611c

Browse files
committed
Fix subject cancellation, add FlatMapDrop
1 parent 1b58d12 commit 60e611c

14 files changed

+614
-12
lines changed

README.md

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ Extensions to the Kotlin Flow library.
1111

1212
```groovy
1313
dependencies {
14-
implementation "com.github.akarnokd:kotlin-flow-extensions:0.0.2"
14+
implementation "com.github.akarnokd:kotlin-flow-extensions:0.0.3"
1515
}
1616
```
1717

@@ -34,6 +34,8 @@ Table of contents
3434
- `Flow.replay`
3535
- `Flow.startCollectOn`
3636
- `Flow.takeUntil`
37+
- `Flow.onBackpressureDrop`
38+
- [`Flow.flatMapDrop`](#flowflatmapdrop)
3739
- `ParallelFlow` operators (`FlowExtensions`)
3840
- `ParallelFlow.concatMap`
3941
- `ParallelFlow.filter`
@@ -150,3 +152,33 @@ runBlocking {
150152
job.join()
151153
}
152154
```
155+
156+
## Flow.flatMapDrop
157+
158+
Maps the upstream value into a `Flow` and relays its items while ignoring further upstream items until the current
159+
inner `Flow` completes.
160+
161+
```kotlin
162+
import hu.akarnokd.kotlin.flow.*
163+
164+
range(1, 10)
165+
.map {
166+
delay(100)
167+
it
168+
}
169+
.flatMapDrop {
170+
range(it * 100, 5)
171+
.map {
172+
delay(30)
173+
it
174+
}
175+
}
176+
.assertResult(
177+
100, 101, 102, 103, 104,
178+
300, 301, 302, 303, 304,
179+
500, 501, 502, 503, 504,
180+
700, 701, 702, 703, 704,
181+
900, 901, 902, 903, 904
182+
)
183+
```
184+

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
GROUP=com.github.akarnokd
2-
VERSION_NAME=0.0.2
2+
VERSION_NAME=0.0.3
33

44
POM_ARTIFACT_ID=kotlin-flow-extensions
55
POM_NAME=Kotlin Flow Extensions

src/main/kotlin/hu/akarnokd/kotlin/flow/BehaviorSubject.kt

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,13 @@
1616

1717
package hu.akarnokd.kotlin.flow
1818

19+
import kotlinx.coroutines.CancellationException
1920
import kotlinx.coroutines.FlowPreview
2021
import kotlinx.coroutines.flow.AbstractFlow
2122
import kotlinx.coroutines.flow.FlowCollector
23+
import kotlinx.coroutines.isActive
2224
import java.util.concurrent.atomic.AtomicReference
25+
import kotlin.coroutines.coroutineContext
2326

2427
/**
2528
* Caches one item and replays it to fresh collectors.
@@ -134,7 +137,11 @@ class BehaviorSubject<T> : AbstractFlow<T>, SubjectAPI<T> {
134137

135138
if (curr.value != NONE) {
136139
try {
137-
collector.emit(curr.value)
140+
if (coroutineContext.isActive) {
141+
collector.emit(curr.value)
142+
} else {
143+
throw CancellationException()
144+
}
138145
} catch (exc: Throwable) {
139146
remove(inner)
140147

@@ -160,7 +167,11 @@ class BehaviorSubject<T> : AbstractFlow<T>, SubjectAPI<T> {
160167
}
161168

162169
try {
163-
collector.emit(next.value)
170+
if (coroutineContext.isActive) {
171+
collector.emit(next.value)
172+
} else {
173+
throw CancellationException()
174+
}
164175
} catch (exc: Throwable) {
165176
remove(inner)
166177

src/main/kotlin/hu/akarnokd/kotlin/flow/FlowExtensions.kt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,13 @@ fun <T> Flow<T>.toList() : Flow<List<T>> {
164164
@FlowPreview
165165
fun <T> Flow<T>.onBackpressurureDrop() : Flow<T> = FlowOnBackpressureDrop(this)
166166

167+
/**
168+
* Maps items from the upstream to [Flow] and relays its items while dropping upstream items
169+
* until the current inner [Flow] completes.
170+
*/
171+
@FlowPreview
172+
fun <T, R> Flow<T>.flatMapDrop(mapper: suspend (T) -> Flow<R>) : Flow<R> = FlowFlatMapDrop(this, mapper)
173+
167174
// -----------------------------------------------------------------------------------------
168175
// Parallel Extensions
169176
// -----------------------------------------------------------------------------------------

src/main/kotlin/hu/akarnokd/kotlin/flow/ReplaySubject.kt

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,14 @@
1616

1717
package hu.akarnokd.kotlin.flow
1818

19+
import kotlinx.coroutines.CancellationException
1920
import kotlinx.coroutines.FlowPreview
2021
import kotlinx.coroutines.flow.AbstractFlow
2122
import kotlinx.coroutines.flow.FlowCollector
23+
import kotlinx.coroutines.isActive
2224
import java.util.concurrent.TimeUnit
2325
import java.util.concurrent.atomic.AtomicReference
26+
import kotlin.coroutines.coroutineContext
2427

2528
/**
2629
* Caches and replays some or all items to collectors.
@@ -240,8 +243,12 @@ class ReplaySubject<T> : AbstractFlow<T>, SubjectAPI<T> {
240243

241244
if (!empty) {
242245
try {
243-
consumer.consumer.emit(list[consumer.index.toInt()])
244-
consumer.index++
246+
if (coroutineContext.isActive) {
247+
consumer.consumer.emit(list[consumer.index.toInt()])
248+
consumer.index++
249+
} else {
250+
throw CancellationException()
251+
}
245252
} catch (ex: Throwable) {
246253
consumer.parent.remove(consumer)
247254

@@ -317,8 +324,12 @@ class ReplaySubject<T> : AbstractFlow<T>, SubjectAPI<T> {
317324

318325
if (!empty) {
319326
try {
320-
consumer.consumer.emit(next.value!!)
321-
consumer.node = next
327+
if (coroutineContext.isActive) {
328+
consumer.consumer.emit(next.value!!)
329+
consumer.node = next
330+
} else {
331+
throw CancellationException()
332+
}
322333
} catch (ex: Throwable) {
323334
consumer.parent.remove(consumer)
324335

@@ -435,8 +446,12 @@ class ReplaySubject<T> : AbstractFlow<T>, SubjectAPI<T> {
435446

436447
if (!empty) {
437448
try {
438-
consumer.consumer.emit(next.value!!)
439-
consumer.node = next
449+
if (coroutineContext.isActive) {
450+
consumer.consumer.emit(next.value!!)
451+
consumer.node = next
452+
} else {
453+
throw CancellationException()
454+
}
440455
} catch (ex: Throwable) {
441456
consumer.parent.remove(consumer)
442457

src/main/kotlin/hu/akarnokd/kotlin/flow/ResumableCollector.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,6 @@ open class ResumableCollector<T> : Resumable() {
8686
if (coroutineContext.isActive) {
8787
collector.emit(v)
8888
} else {
89-
onComplete?.invoke(this)
9089
throw CancellationException()
9190
}
9291
} catch (exc: Throwable) {
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright 2019 David Karnok
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package hu.akarnokd.kotlin.flow.impl
17+
18+
import hu.akarnokd.kotlin.flow.Resumable
19+
import kotlinx.coroutines.*
20+
import kotlinx.coroutines.flow.AbstractFlow
21+
import kotlinx.coroutines.flow.Flow
22+
import kotlinx.coroutines.flow.FlowCollector
23+
import kotlinx.coroutines.flow.collect
24+
import java.util.concurrent.atomic.AtomicBoolean
25+
import java.util.concurrent.atomic.AtomicReference
26+
27+
@FlowPreview
28+
internal class FlowFlatMapDrop<T, R>(private val source: Flow<T>, private val mapper: suspend (T) -> Flow<R>) : AbstractFlow<R>() {
29+
@ExperimentalCoroutinesApi
30+
@InternalCoroutinesApi
31+
override suspend fun collectSafely(collector: FlowCollector<R>) {
32+
coroutineScope {
33+
34+
val resume = Resumable();
35+
val consumerReady = AtomicBoolean(true)
36+
val value = AtomicReference<T>()
37+
val hasValue = AtomicBoolean()
38+
val done = AtomicBoolean()
39+
val error = AtomicReference<Throwable>()
40+
41+
val job = launch {
42+
try {
43+
source.collect {
44+
if (consumerReady.get()) {
45+
consumerReady.set(false)
46+
value.lazySet(it)
47+
hasValue.set(true);
48+
resume.resume()
49+
}
50+
}
51+
done.set(true)
52+
} catch (ex: Throwable) {
53+
error.set(ex)
54+
}
55+
resume.resume()
56+
}
57+
58+
while (coroutineContext.isActive) {
59+
resume.await()
60+
61+
val d = done.get()
62+
val e = error.get()
63+
val h = hasValue.get()
64+
val v = value.get()
65+
66+
if (e != null && !h) {
67+
throw e;
68+
}
69+
70+
if (d && !h) {
71+
break;
72+
}
73+
74+
if (h) {
75+
value.lazySet(null)
76+
hasValue.set(false)
77+
try {
78+
mapper(v).collect {
79+
collector.emit(it)
80+
}
81+
} catch (ex: Throwable) {
82+
job.cancel()
83+
throw ex
84+
}
85+
}
86+
87+
consumerReady.set(true);
88+
}
89+
}
90+
}
91+
}

src/main/kotlin/hu/akarnokd/kotlin/flow/impl/FlowOnBackpressureDrop.kt

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,19 @@
1+
/*
2+
* Copyright 2019 David Karnok
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
117
package hu.akarnokd.kotlin.flow.impl
218

319
import hu.akarnokd.kotlin.flow.Resumable

src/test/kotlin/hu/akarnokd/kotlin/flow/BehaviorSubjectTest.kt

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -363,4 +363,88 @@ class BehaviorSubjectTest {
363363
assertEquals<Any?>(listOf(null), result)
364364
}
365365
}
366+
367+
@Test
368+
fun cancelledConsumer() = runBlocking {
369+
withSingle {
370+
val subject = BehaviorSubject<Int>()
371+
372+
val expected = 3
373+
val n = 10
374+
375+
val counter1 = AtomicInteger()
376+
377+
val job1 = launch(it.asCoroutineDispatcher()) {
378+
subject.collect {
379+
if (counter1.incrementAndGet() == expected) {
380+
cancel()
381+
}
382+
}
383+
}
384+
385+
while (!subject.hasCollectors()) {
386+
delay(1)
387+
}
388+
389+
for (i in 1..n) {
390+
subject.emit(i)
391+
}
392+
393+
// wait for the subject to finish
394+
for (i in 1..1000) {
395+
if (job1.isCancelled) {
396+
break;
397+
}
398+
delay(10)
399+
}
400+
401+
assertEquals(true, job1.isCancelled)
402+
assertEquals(expected, counter1.get())
403+
assertEquals(0, subject.collectorCount())
404+
}
405+
406+
}
407+
408+
@Test
409+
fun cancelledOneCollectorSecondCompletes() = runBlocking {
410+
withSingle {
411+
val subject = BehaviorSubject<Int>()
412+
413+
val expected = 3
414+
val n = 10
415+
416+
val counter1 = AtomicInteger()
417+
val counter2 = AtomicInteger()
418+
419+
val job1 = launch(it.asCoroutineDispatcher()) {
420+
subject.collect {
421+
if (counter1.incrementAndGet() == expected) {
422+
cancel()
423+
}
424+
}
425+
}
426+
427+
val job2 = launch(it.asCoroutineDispatcher()) {
428+
subject.collect { counter2.incrementAndGet() }
429+
}
430+
431+
while (subject.collectorCount() != 2) {
432+
delay(1)
433+
}
434+
435+
for (i in 1..n) {
436+
subject.emit(i)
437+
}
438+
439+
subject.complete()
440+
job2.join()
441+
442+
assertEquals(true, job1.isCancelled)
443+
assertEquals(true, job2.isCompleted)
444+
assertEquals(expected, counter1.get())
445+
assertEquals(n, counter2.get())
446+
assertEquals(0, subject.collectorCount())
447+
}
448+
449+
}
366450
}

src/test/kotlin/hu/akarnokd/kotlin/flow/PublishSubjectTest.kt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,14 @@ class PublishSubjectTest {
301301
subject.emit(i)
302302
}
303303

304+
// wait for the subject to finish
305+
for (i in 1..1000) {
306+
if (job1.isCancelled) {
307+
break;
308+
}
309+
delay(10)
310+
}
311+
304312
assertEquals(true, job1.isCancelled)
305313
assertEquals(expected, counter1.get())
306314
assertEquals(0, subject.collectorCount())

0 commit comments

Comments
 (0)