Skip to content

Commit a53b65f

Browse files
committed
Add UnicastSubject and UnicastWorkSubject
1 parent 4065e0b commit a53b65f

File tree

6 files changed

+670
-1
lines changed

6 files changed

+670
-1
lines changed

README.md

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ Table of contents
2323
- [PublishSubject](#publishsubject)
2424
- [ReplaySubject](#replaysubject)
2525
- [BehaviorSubject](#behaviorsubject)
26+
- [UnicastSubject](#unicastsubject)
27+
- [UnicastWorkSubject](#unicastworksubject)
2628
- Sources
2729
- `range`
2830
- `timer`
@@ -209,3 +211,52 @@ can be used to hold back the upstream until the expected number of collectors ha
209211
In the example, it is known `merge` will establish 2 collectors, thus the `publish` can be instructed to await those 2.
210212
Without the argument, `range` would rush through its items as `merge` doesn't start collecting in time, causing an
211213
empty result list.
214+
215+
## UnicastSubject
216+
217+
Buffers items until a single collector starts collecting items. Use `collectorCancelled` to
218+
detect when the collector no longer wants to collect items.
219+
220+
Note that the subject uses an unbounded inner buffer and does not suspend its input side if
221+
the collector never arrives or can't keep up.
222+
223+
```kotlin
224+
val us = UnicastSubject()
225+
226+
launchIn(Dispatchers.IO) {
227+
for (i in 1..200) {
228+
println("Emitting $i")
229+
us.emit(i)
230+
delay(1)
231+
}
232+
emit.complete()
233+
}
234+
235+
// collector arrives late for some reason
236+
delay(100)
237+
238+
us.collect { println("Collecting $it") }
239+
```
240+
241+
## UnicastWorkSubject
242+
243+
Buffers items until and inbetween a single collector is able to collect items. If the current
244+
collector cancels, the next collector will receive the subsequent items.
245+
246+
Note that the subject uses an unbounded inner buffer and does not suspend its input side if
247+
the collector never arrives or can't keep up.
248+
249+
```kotlin
250+
val uws = UnicastWorkSubject()
251+
252+
generateInts(uws, 1, 15)
253+
254+
// prints lines 1..5
255+
uws.take(5).collect { println(it) }
256+
257+
// prints lines 6..10
258+
uws.take(5).collect { println(it) }
259+
260+
// prints lines 11..15
261+
uws.take(5).collect { println(it) }
262+
```

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
GROUP=com.github.akarnokd
2-
VERSION_NAME=0.0.7
2+
VERSION_NAME=0.0.8
33

44
POM_ARTIFACT_ID=kotlin-flow-extensions
55
POM_NAME=Kotlin Flow Extensions
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Copyright 2019-2020 David Karnok
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package hu.akarnokd.kotlin.flow
18+
19+
import kotlinx.coroutines.flow.AbstractFlow
20+
import kotlinx.coroutines.flow.FlowCollector
21+
import java.util.concurrent.ConcurrentLinkedQueue
22+
import java.util.concurrent.atomic.AtomicReference
23+
24+
/**
25+
* Buffers items until a single collector collects them.
26+
*
27+
* @param <T> the item type
28+
*/
29+
class UnicastSubject<T> : AbstractFlow<T>(), SubjectAPI<T> {
30+
31+
companion object {
32+
val terminatedCollector = object : FlowCollector<Any> {
33+
override suspend fun emit(value: Any) {
34+
}
35+
}
36+
val terminated = Throwable("No more elements")
37+
}
38+
39+
val queue = ConcurrentLinkedQueue<T>()
40+
41+
@Volatile
42+
var terminal : Throwable? = null
43+
44+
val resumable = Resumable()
45+
46+
val current = AtomicReference<FlowCollector<T>>()
47+
48+
override suspend fun collectSafely(collector: FlowCollector<T>) {
49+
while (true) {
50+
val curr = current.get()
51+
if (curr != null) {
52+
throw IllegalStateException("Only one collector allowed")
53+
}
54+
if (current.compareAndSet(curr, collector)) {
55+
break
56+
}
57+
}
58+
59+
while (true) {
60+
val t = terminal
61+
val v = queue.poll()
62+
63+
if (t != null && v == null) {
64+
current.getAndSet(terminatedCollector as FlowCollector<T>)
65+
if (t != terminated) {
66+
throw t
67+
}
68+
return
69+
}
70+
if (v != null) {
71+
try {
72+
collector.emit(v)
73+
} catch (e: Throwable) {
74+
current.getAndSet(terminatedCollector as FlowCollector<T>)
75+
queue.clear()
76+
throw e
77+
}
78+
} else {
79+
resumable.await()
80+
}
81+
}
82+
}
83+
84+
override suspend fun emitError(ex: Throwable) {
85+
terminal = ex
86+
resumable.resume()
87+
}
88+
89+
override suspend fun complete() {
90+
terminal = terminated
91+
resumable.resume()
92+
}
93+
94+
override fun hasCollectors(): Boolean {
95+
val curr = current.get()
96+
return curr != null && curr != terminatedCollector
97+
}
98+
99+
override fun collectorCount(): Int {
100+
return if (hasCollectors()) 1 else 0
101+
}
102+
103+
override suspend fun emit(value: T) {
104+
if (current.get() != terminatedCollector) {
105+
queue.offer(value)
106+
resumable.resume()
107+
} else {
108+
queue.clear()
109+
}
110+
}
111+
112+
fun collectorCancelled() : Boolean {
113+
return current.get() == terminatedCollector
114+
}
115+
}
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
* Copyright 2019-2020 David Karnok
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package hu.akarnokd.kotlin.flow
18+
19+
import kotlinx.coroutines.flow.AbstractFlow
20+
import kotlinx.coroutines.flow.FlowCollector
21+
import java.util.concurrent.ConcurrentLinkedQueue
22+
import java.util.concurrent.atomic.AtomicReference
23+
24+
/**
25+
* Buffers items until a single collector collects them.
26+
*
27+
* @param <T> the item type
28+
*/
29+
class UnicastWorkSubject<T> : AbstractFlow<T>(), SubjectAPI<T> {
30+
31+
companion object {
32+
val terminated = Throwable("No more elements")
33+
}
34+
35+
val queue = ConcurrentLinkedQueue<T>()
36+
37+
@Volatile
38+
var terminal : Throwable? = null
39+
40+
val resumable = Resumable()
41+
42+
val current = AtomicReference<FlowCollector<T>>()
43+
44+
override suspend fun collectSafely(collector: FlowCollector<T>) {
45+
while (true) {
46+
val curr = current.get()
47+
if (curr != null) {
48+
throw IllegalStateException("Only one collector allowed")
49+
}
50+
if (current.compareAndSet(curr, collector)) {
51+
break
52+
}
53+
}
54+
55+
while (true) {
56+
val t = terminal
57+
val v = queue.poll()
58+
59+
if (t != null && v == null) {
60+
current.getAndSet(null)
61+
if (t != terminated) {
62+
throw t
63+
}
64+
return
65+
}
66+
if (v != null) {
67+
try {
68+
collector.emit(v)
69+
} catch (e: Throwable) {
70+
current.getAndSet(null)
71+
throw e
72+
}
73+
} else {
74+
resumable.await()
75+
}
76+
}
77+
}
78+
79+
override suspend fun emitError(ex: Throwable) {
80+
terminal = ex
81+
resumable.resume()
82+
}
83+
84+
override suspend fun complete() {
85+
terminal = terminated
86+
resumable.resume()
87+
}
88+
89+
override fun hasCollectors(): Boolean {
90+
val curr = current.get()
91+
return curr != null
92+
}
93+
94+
override fun collectorCount(): Int {
95+
return if (hasCollectors()) 1 else 0
96+
}
97+
98+
override suspend fun emit(value: T) {
99+
queue.offer(value)
100+
resumable.resume()
101+
}
102+
}

0 commit comments

Comments
 (0)