1+ package org.akanework.gramophone.logic.utils
2+
3+ import android.util.Log
4+ import androidx.lifecycle.Lifecycle
5+ import androidx.lifecycle.LifecycleOwner
6+ import androidx.lifecycle.lifecycleScope
7+ import androidx.lifecycle.repeatOnLifecycle
8+ import kotlinx.coroutines.CoroutineScope
9+ import kotlinx.coroutines.ExperimentalCoroutinesApi
10+ import kotlinx.coroutines.ExperimentalForInheritanceCoroutinesApi
11+ import kotlinx.coroutines.Job
12+ import kotlinx.coroutines.channels.Channel
13+ import kotlinx.coroutines.channels.consume
14+ import kotlinx.coroutines.channels.produce
15+ import kotlinx.coroutines.coroutineScope
16+ import kotlinx.coroutines.currentCoroutineContext
17+ import kotlinx.coroutines.delay
18+ import kotlinx.coroutines.flow.Flow
19+ import kotlinx.coroutines.flow.FlowCollector
20+ import kotlinx.coroutines.flow.MutableStateFlow
21+ import kotlinx.coroutines.flow.SharedFlow
22+ import kotlinx.coroutines.flow.SharingStarted
23+ import kotlinx.coroutines.flow.SharingStarted.Companion.WhileSubscribed
24+ import kotlinx.coroutines.flow.StateFlow
25+ import kotlinx.coroutines.flow.channelFlow
26+ import kotlinx.coroutines.flow.collect
27+ import kotlinx.coroutines.flow.emptyFlow
28+ import kotlinx.coroutines.flow.first
29+ import kotlinx.coroutines.flow.flatMapLatest
30+ import kotlinx.coroutines.flow.flowOf
31+ import kotlinx.coroutines.flow.map
32+ import kotlinx.coroutines.flow.mapLatest
33+ import kotlinx.coroutines.flow.shareIn
34+ import kotlinx.coroutines.flow.stateIn
35+ import kotlinx.coroutines.launch
36+ import kotlinx.coroutines.withContext
37+ import kotlin.coroutines.CoroutineContext
38+ import kotlin.coroutines.EmptyCoroutineContext
39+ import kotlin.time.Duration
40+
41+ interface PauseManager : CoroutineContext .Element {
42+ val isPaused: Flow <Boolean >
43+
44+ override val key: CoroutineContext .Key <* > get() = Key
45+ companion object Key : CoroutineContext.Key<PauseManager>
46+ }
47+
48+ class LifecyclePauseManager (scope : CoroutineScope , source : LifecycleOwner , minimumState : Lifecycle .State )
49+ : PauseManager {
50+ override val isPaused = source.lifecycle.currentStateFlow.map { ! it.isAtLeast(minimumState) }
51+ .stateIn(scope, WhileSubscribed (), ! source.lifecycle.currentState.isAtLeast(minimumState))
52+ }
53+
54+ object EmptyPauseManager : PauseManager {
55+ @OptIn(ExperimentalForInheritanceCoroutinesApi ::class )
56+ override val isPaused = object : StateFlow <Boolean > {
57+ override val value: Boolean = false
58+ override val replayCache = listOf (false )
59+
60+ override suspend fun collect (collector : FlowCollector <Boolean >): Nothing {
61+ collector.emit(false )
62+ while (true )
63+ delay(Duration .INFINITE )
64+ }
65+
66+ }
67+ }
68+
69+ @OptIn(ExperimentalCoroutinesApi ::class )
70+ class CountingPauseManager : PauseManager {
71+ private val flows = MutableStateFlow (listOf<Flow <Boolean >>())
72+ override val isPaused = flows.mapLatest { it.find { ! it.first() } == null } // will pause when 0 items
73+
74+ fun add (other : PauseManager ) {
75+ flows.value + = other.isPaused
76+ }
77+
78+ fun remove (other : PauseManager ) {
79+ flows.value - = other.isPaused
80+ }
81+ }
82+
83+ @OptIn(ExperimentalForInheritanceCoroutinesApi ::class )
84+ class PauseManagingSharedFlow <T >() : SharedFlow<T> {
85+ private val pauseManager = CountingPauseManager ()
86+ lateinit var sharedFlow: SharedFlow <T >
87+
88+ override val replayCache: List <T >
89+ get() = sharedFlow.replayCache
90+
91+ override suspend fun collect (collector : FlowCollector <T >): Nothing {
92+ val pm = currentCoroutineContext()[PauseManager ] ? : EmptyPauseManager
93+ try {
94+ Log .w(" Tag" , java.lang.IllegalStateException (" register" ))
95+ pauseManager.add(pm)
96+ withContext(pauseManager) {
97+ sharedFlow.collect(collector)
98+ }
99+ } finally {
100+ Log .w(" Tag" , java.lang.IllegalStateException (" remove" ))
101+ pauseManager.remove(pm)
102+ }
103+ }
104+
105+ companion object {
106+ fun <T > Flow<T>.sharePauseableIn (
107+ scope : CoroutineScope ,
108+ started : SharingStarted ,
109+ replay : Int = 0
110+ ): SharedFlow <T > {
111+ val wrapper = PauseManagingSharedFlow <T >()
112+ wrapper.sharedFlow = shareIn(CoroutineScope (scope.coroutineContext + Job (scope.coroutineContext[Job ])
113+ + wrapper.pauseManager), started, replay)
114+ return wrapper
115+ }
116+ }
117+ }
118+
119+ @OptIn(ExperimentalForInheritanceCoroutinesApi ::class )
120+ class PauseManagingStateFlow <T >() : StateFlow<T> {
121+ private val pauseManager = CountingPauseManager ()
122+ lateinit var stateFlow: StateFlow <T >
123+
124+ override val replayCache: List <T >
125+ get() = stateFlow.replayCache
126+
127+ override suspend fun collect (collector : FlowCollector <T >): Nothing {
128+ val pm = currentCoroutineContext()[PauseManager ] ? : EmptyPauseManager
129+ try {
130+ pauseManager.add(pm)
131+ withContext(pauseManager) {
132+ stateFlow.collect(collector)
133+ }
134+ } finally {
135+ pauseManager.remove(pm)
136+ }
137+ }
138+
139+ override val value: T
140+ get() = stateFlow.value
141+
142+ companion object {
143+ fun <T > Flow<T>.statePauseableIn (
144+ scope : CoroutineScope ,
145+ started : SharingStarted ,
146+ initialValue : T
147+ ): SharedFlow <T > {
148+ val wrapper = PauseManagingStateFlow <T >()
149+ wrapper.stateFlow = stateIn(CoroutineScope (scope.coroutineContext + Job (scope.coroutineContext[Job ])
150+ + wrapper.pauseManager), started, initialValue)
151+ return wrapper
152+ }
153+ }
154+ }
155+
156+ @OptIn(ExperimentalCoroutinesApi ::class )
157+ suspend fun <T > repeatFlowWhenUnpaused (enforcePauseable : Boolean = false, block : suspend () -> T ): Flow <T > {
158+ val pauseManager = currentCoroutineContext()[PauseManager ]
159+ return if (pauseManager != null ) {
160+ pauseManager.isPaused.flatMapLatest {
161+ Log .e(" hi" , " hii ${pauseManager.isPaused.first()} " )
162+ if (! it) {
163+ try {
164+ flowOf(block())
165+ } finally {
166+ Log .e(" hi" , " byee ${pauseManager.isPaused.first()} " )
167+ }
168+ }
169+ else emptyFlow()
170+ }
171+ } else {
172+ if (enforcePauseable)
173+ throw IllegalStateException (" enforcePauseable is set to true, expected to find PauseManager" )
174+ flowOf(block())
175+ }
176+ }
177+
178+ suspend fun repeatWhenUnpaused (enforcePauseable : Boolean = false, block : suspend () -> Unit ) {
179+ repeatFlowWhenUnpaused(enforcePauseable, block).collect()
180+ }
181+
182+ suspend fun <T > repeatUntilDoneWhenUnpaused (enforcePauseable : Boolean = false, block : suspend () -> T ): T {
183+ return repeatFlowWhenUnpaused(enforcePauseable, block).first()
184+ }
185+
186+ fun repeatPausingWithLifecycle (source : LifecycleOwner ,
187+ context : CoroutineContext = EmptyCoroutineContext ,
188+ minimumStateForCollect : Lifecycle .State = Lifecycle .State .CREATED ,
189+ minimumStateForUnpause : Lifecycle .State = Lifecycle .State .RESUMED ,
190+ block : suspend () -> Unit ) {
191+ source.lifecycleScope.launch {
192+ source.repeatOnLifecycle(minimumStateForCollect) {
193+ withContext(context + LifecyclePauseManager (this , source, minimumStateForUnpause)) {
194+ block()
195+ }
196+ }
197+ }
198+ }
199+
200+ // Downstream will still finish processing values, to avoid, wrap downstream in repeatUntilDoneWhenUnpaused
201+ @OptIn(ExperimentalCoroutinesApi ::class )
202+ fun <T > Flow<T>.bufferAndBlockWhenPaused (capacity : Int = Channel .UNLIMITED , enforcePauseable : Boolean = false): Flow <T > = channelFlow {
203+ coroutineScope {
204+ produce(capacity = capacity) {
205+ collect { item -> send(item) }
206+ }.consume {
207+ repeatWhenUnpaused(enforcePauseable) {
208+ while (true ) {
209+ val item = receiveCatching()
210+ if (item.isClosed) {
211+ close()
212+ break
213+ }
214+ send(item.getOrThrow())
215+ }
216+ }
217+ }
218+ }
219+ }
220+
221+ @Suppress(" NOTHING_TO_INLINE" )
222+ inline fun <T > Flow<T>.conflateAndBlockWhenPaused (enforcePauseable : Boolean = false) =
223+ bufferAndBlockWhenPaused(Channel .CONFLATED , enforcePauseable)
0 commit comments