Skip to content

Commit 265e3d5

Browse files
committed
Fix hang in publish(n) when more collect
1 parent adec011 commit 265e3d5

File tree

4 files changed

+37
-4
lines changed

4 files changed

+37
-4
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ Extensions to the Kotlin Flow library.
1111

1212
```groovy
1313
dependencies {
14-
implementation "com.github.akarnokd:kotlin-flow-extensions:0.0.6"
14+
implementation "com.github.akarnokd:kotlin-flow-extensions:0.0.7"
1515
}
1616
```
1717

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
GROUP=com.github.akarnokd
2-
VERSION_NAME=0.0.6
2+
VERSION_NAME=0.0.7
33

44
POM_ARTIFACT_ID=kotlin-flow-extensions
55
POM_NAME=Kotlin Flow Extensions

src/main/kotlin/hu/akarnokd/kotlin/flow/MulticastSubject.kt

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,8 +144,17 @@ class MulticastSubject<T>(private val expectedCollectors: Int) : AbstractFlow<T>
144144
override suspend fun collectSafely(collector: FlowCollector<T>) {
145145
val rc = ResumableCollector<T>()
146146
if (add(rc)) {
147-
if (remainingCollectors.decrementAndGet() == 0) {
148-
producer.resume()
147+
while (true) {
148+
val a = remainingCollectors.get()
149+
if (a == 0) {
150+
break;
151+
}
152+
if (remainingCollectors.compareAndSet(a, a - 1)) {
153+
if (a == 1) {
154+
producer.resume();
155+
}
156+
break;
157+
}
149158
}
150159
rc.drain(collector) { remove(it) }
151160
} else {

src/test/kotlin/hu/akarnokd/kotlin/flow/MulticastSubjectTest.kt

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -466,4 +466,28 @@ class MulticastSubjectTest {
466466
job1.join()
467467
job2.join()
468468
}
469+
470+
@Test(timeout = 5000)
471+
@ExperimentalCoroutinesApi
472+
fun moreThanExpectedCollectors() = runBlocking {
473+
val subject = MulticastSubject<Int>(2)
474+
val result = mutableListOf<Int>()
475+
476+
val job = launch(Dispatchers.IO) {
477+
merge(subject, subject, subject)
478+
.collect { result.add(it) }
479+
}
480+
481+
// wait for the collector to arrive
482+
while (subject.collectorCount() != 3) {
483+
delay(1)
484+
}
485+
486+
subject.emit(1)
487+
subject.complete()
488+
489+
job.join()
490+
491+
assertEquals(listOf(1, 1, 1), result)
492+
}
469493
}

0 commit comments

Comments
 (0)