Skip to content

Commit 6aea829

Browse files
committed
Use cancelable suspensions
1 parent f95030a commit 6aea829

File tree

11 files changed

+108
-24
lines changed

11 files changed

+108
-24
lines changed

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
GROUP=com.github.akarnokd
2-
VERSION_NAME=0.0.3
2+
VERSION_NAME=0.0.4
33

44
POM_ARTIFACT_ID=kotlin-flow-extensions
55
POM_NAME=Kotlin Flow Extensions

gradle/wrapper/gradle-wrapper.jar

426 Bytes
Binary file not shown.
Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1+
#Wed Apr 01 09:54:21 CEST 2020
2+
distributionUrl=https\://services.gradle.org/distributions/gradle-6.3-all.zip
13
distributionBase=GRADLE_USER_HOME
24
distributionPath=wrapper/dists
3-
distributionUrl=https\://services.gradle.org/distributions/gradle-5.5.1-bin.zip
4-
zipStoreBase=GRADLE_USER_HOME
55
zipStorePath=wrapper/dists
6+
zipStoreBase=GRADLE_USER_HOME

gradlew

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,21 @@
11
#!/usr/bin/env sh
22

3+
#
4+
# Copyright 2015 the original author or authors.
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# https://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#
18+
319
##############################################################################
420
##
521
## Gradle start up script for UN*X
@@ -28,7 +44,7 @@ APP_NAME="Gradle"
2844
APP_BASE_NAME=`basename "$0"`
2945

3046
# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
31-
DEFAULT_JVM_OPTS='"-Xmx64m"'
47+
DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'
3248

3349
# Use the maximum available, or set MAX_FD != -1 to use that value.
3450
MAX_FD="maximum"

gradlew.bat

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,19 @@
1+
@rem
2+
@rem Copyright 2015 the original author or authors.
3+
@rem
4+
@rem Licensed under the Apache License, Version 2.0 (the "License");
5+
@rem you may not use this file except in compliance with the License.
6+
@rem You may obtain a copy of the License at
7+
@rem
8+
@rem https://www.apache.org/licenses/LICENSE-2.0
9+
@rem
10+
@rem Unless required by applicable law or agreed to in writing, software
11+
@rem distributed under the License is distributed on an "AS IS" BASIS,
12+
@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
@rem See the License for the specific language governing permissions and
14+
@rem limitations under the License.
15+
@rem
16+
117
@if "%DEBUG%" == "" @echo off
218
@rem ##########################################################################
319
@rem
@@ -14,7 +30,7 @@ set APP_BASE_NAME=%~n0
1430
set APP_HOME=%DIRNAME%
1531

1632
@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
17-
set DEFAULT_JVM_OPTS="-Xmx64m"
33+
set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m"
1834

1935
@rem Find java.exe
2036
if defined JAVA_HOME goto findJavaFromJavaHome

src/main/kotlin/hu/akarnokd/kotlin/flow/BehaviorSubject.kt

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -81,9 +81,14 @@ class BehaviorSubject<T> : AbstractFlow<T>, SubjectAPI<T> {
8181
current = next
8282

8383
for (collector in collectors.get()) {
84-
collector.consumeReady.await()
84+
try {
85+
collector.consumeReady.await()
86+
87+
collector.resume()
88+
} catch (ex: CancellationException) {
89+
remove(collector)
90+
}
8591

86-
collector.resume()
8792
}
8893
}
8994
}
@@ -100,9 +105,13 @@ class BehaviorSubject<T> : AbstractFlow<T>, SubjectAPI<T> {
100105
current = DONE
101106

102107
for (collector in collectors.getAndSet(TERMINATED)) {
103-
collector.consumeReady.await()
108+
try {
109+
collector.consumeReady.await()
104110

105-
collector.resume()
111+
collector.resume()
112+
} catch (_: CancellationException) {
113+
// ignored
114+
}
106115
}
107116
}
108117
}
@@ -118,9 +127,13 @@ class BehaviorSubject<T> : AbstractFlow<T>, SubjectAPI<T> {
118127
current = DONE
119128

120129
for (collector in collectors.getAndSet(TERMINATED)) {
121-
collector.consumeReady.await()
130+
try {
131+
collector.consumeReady.await()
122132

123-
collector.resume()
133+
collector.resume()
134+
} catch (_: CancellationException) {
135+
// ignored
136+
}
124137
}
125138
}
126139
}

src/main/kotlin/hu/akarnokd/kotlin/flow/PublishSubject.kt

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package hu.akarnokd.kotlin.flow
1818

19+
import kotlinx.coroutines.CancellationException
1920
import kotlinx.coroutines.FlowPreview
2021
import kotlinx.coroutines.flow.AbstractFlow
2122
import kotlinx.coroutines.flow.Flow
@@ -56,7 +57,11 @@ class PublishSubject<T> : AbstractFlow<T>(), SubjectAPI<T> {
5657
*/
5758
override suspend fun emit(value: T) {
5859
for (collector in collectors.get()) {
59-
collector.next(value)
60+
try {
61+
collector.next(value)
62+
} catch (ex: CancellationException) {
63+
remove(collector);
64+
}
6065
}
6166
}
6267

@@ -68,7 +73,11 @@ class PublishSubject<T> : AbstractFlow<T>(), SubjectAPI<T> {
6873
this.error = ex
6974
@Suppress("UNCHECKED_CAST")
7075
for (collector in collectors.getAndSet(TERMINATED as Array<ResumableCollector<T>>)) {
71-
collector.error(ex)
76+
try {
77+
collector.error(ex)
78+
} catch (_: CancellationException) {
79+
// ignored
80+
}
7281
}
7382
}
7483
}
@@ -79,7 +88,11 @@ class PublishSubject<T> : AbstractFlow<T>(), SubjectAPI<T> {
7988
override suspend fun complete() {
8089
@Suppress("UNCHECKED_CAST")
8190
for (collector in collectors.getAndSet(TERMINATED as Array<ResumableCollector<T>>)) {
82-
collector.complete()
91+
try {
92+
collector.complete()
93+
} catch (_: CancellationException) {
94+
// ignored
95+
}
8396
}
8497
}
8598

src/main/kotlin/hu/akarnokd/kotlin/flow/Resumable.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package hu.akarnokd.kotlin.flow
1818

19+
import kotlinx.coroutines.suspendCancellableCoroutine
1920
import java.lang.IllegalStateException
2021
import java.util.concurrent.atomic.AtomicReference
2122
import kotlin.coroutines.Continuation
@@ -39,7 +40,7 @@ open class Resumable : AtomicReference<Continuation<Any>>() {
3940
* Only one thread can call this method.
4041
*/
4142
suspend fun await() {
42-
suspendCoroutine<Any> {
43+
suspendCancellableCoroutine<Any> {
4344
while (true) {
4445
val current = get()
4546
if (current == READY) {

src/main/kotlin/hu/akarnokd/kotlin/flow/ResumableCollector.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ open class ResumableCollector<T> : Resumable() {
7070
}
7171

7272
suspend fun drain(collector: FlowCollector<T>, onComplete: ((ResumableCollector<T>) -> Unit)? = null) {
73-
while (true) {
73+
while (coroutineContext.isActive) {
7474

7575
readyConsumer()
7676

src/test/kotlin/hu/akarnokd/kotlin/flow/BehaviorSubjectTest.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ class BehaviorSubjectTest {
364364
}
365365
}
366366

367-
@Test
367+
@Test(timeout = 1000)
368368
fun cancelledConsumer() = runBlocking {
369369
withSingle {
370370
val subject = BehaviorSubject<Int>()
@@ -377,7 +377,7 @@ class BehaviorSubjectTest {
377377
val job1 = launch(it.asCoroutineDispatcher()) {
378378
subject.collect {
379379
if (counter1.incrementAndGet() == expected) {
380-
cancel()
380+
throw CancellationException();
381381
}
382382
}
383383
}
@@ -405,7 +405,7 @@ class BehaviorSubjectTest {
405405

406406
}
407407

408-
@Test
408+
@Test(timeout = 1000)
409409
fun cancelledOneCollectorSecondCompletes() = runBlocking {
410410
withSingle {
411411
val subject = BehaviorSubject<Int>()
@@ -419,7 +419,7 @@ class BehaviorSubjectTest {
419419
val job1 = launch(it.asCoroutineDispatcher()) {
420420
subject.collect {
421421
if (counter1.incrementAndGet() == expected) {
422-
cancel()
422+
throw CancellationException();
423423
}
424424
}
425425
}

0 commit comments

Comments
 (0)