11package org.akanework.gramophone.logic.utils
22
3- import android.util.Log
43import androidx.lifecycle.Lifecycle
54import androidx.lifecycle.LifecycleOwner
65import androidx.lifecycle.lifecycleScope
76import androidx.lifecycle.repeatOnLifecycle
87import kotlinx.coroutines.CoroutineScope
8+ import kotlinx.coroutines.Dispatchers
99import kotlinx.coroutines.ExperimentalCoroutinesApi
1010import kotlinx.coroutines.ExperimentalForInheritanceCoroutinesApi
1111import kotlinx.coroutines.Job
@@ -19,11 +19,14 @@ import kotlinx.coroutines.flow.Flow
1919import kotlinx.coroutines.flow.FlowCollector
2020import kotlinx.coroutines.flow.MutableStateFlow
2121import kotlinx.coroutines.flow.SharedFlow
22+ import kotlinx.coroutines.flow.SharingCommand
2223import kotlinx.coroutines.flow.SharingStarted
24+ import kotlinx.coroutines.flow.SharingStarted.Companion.Eagerly
2325import kotlinx.coroutines.flow.SharingStarted.Companion.WhileSubscribed
2426import kotlinx.coroutines.flow.StateFlow
2527import kotlinx.coroutines.flow.channelFlow
2628import kotlinx.coroutines.flow.collect
29+ import kotlinx.coroutines.flow.combine
2730import kotlinx.coroutines.flow.emptyFlow
2831import kotlinx.coroutines.flow.first
2932import kotlinx.coroutines.flow.flatMapLatest
@@ -39,7 +42,7 @@ import kotlin.coroutines.EmptyCoroutineContext
3942import kotlin.time.Duration
4043
4144interface PauseManager : CoroutineContext .Element {
42- val isPaused: Flow <Boolean >
45+ val isPaused: StateFlow <Boolean >
4346
4447 override val key: CoroutineContext .Key <* > get() = Key
4548 companion object Key : CoroutineContext.Key<PauseManager>
@@ -67,22 +70,34 @@ object EmptyPauseManager : PauseManager {
6770}
6871
6972@OptIn(ExperimentalCoroutinesApi ::class )
70- class CountingPauseManager : PauseManager {
71- private val flows = MutableStateFlow (listOf<Flow <Boolean >>())
72- override val isPaused = flows.mapLatest { it.find { ! it.first() } == null } // will pause when 0 items
73+ class CountingPauseManager (paused : SharingStarted ) : PauseManager {
74+ private val flows = MutableStateFlow (listOf<PauseManager >())
75+ override val isPaused = paused.command(flows.flatMapLatest {
76+ combine(it.map { it.isPaused }) { it.size - it.count { it } }
77+ }.stateIn(CoroutineScope (Dispatchers .Default ), Eagerly , 0 ))
78+ .mapLatest {
79+ when (it) {
80+ SharingCommand .START -> false
81+ SharingCommand .STOP ,
82+ SharingCommand .STOP_AND_RESET_REPLAY_CACHE -> true
83+ }
84+ }.stateIn(CoroutineScope (Dispatchers .Default ), Eagerly , true )
85+ // override val isPaused = flows.flatMapLatest {
86+ // combine(it.map { it.isPaused }) { !it.contains(false) }
87+ // }.stateIn(CoroutineScope(Dispatchers.Default), Eagerly, true)
7388
7489 fun add (other : PauseManager ) {
75- flows.value + = other.isPaused
90+ flows.value + = other
7691 }
7792
7893 fun remove (other : PauseManager ) {
79- flows.value - = other.isPaused
94+ flows.value - = other
8095 }
8196}
8297
8398@OptIn(ExperimentalForInheritanceCoroutinesApi ::class )
84- class PauseManagingSharedFlow <T >() : SharedFlow<T> {
85- private val pauseManager = CountingPauseManager ()
99+ class PauseManagingSharedFlow <T >(paused : SharingStarted ) : SharedFlow<T> {
100+ private val pauseManager = CountingPauseManager (paused )
86101 lateinit var sharedFlow: SharedFlow <T >
87102
88103 override val replayCache: List <T >
@@ -91,13 +106,11 @@ class PauseManagingSharedFlow<T>() : SharedFlow<T> {
91106 override suspend fun collect (collector : FlowCollector <T >): Nothing {
92107 val pm = currentCoroutineContext()[PauseManager ] ? : EmptyPauseManager
93108 try {
94- Log .w(" Tag" , java.lang.IllegalStateException (" register" ))
95109 pauseManager.add(pm)
96110 withContext(pauseManager) {
97111 sharedFlow.collect(collector)
98112 }
99113 } finally {
100- Log .w(" Tag" , java.lang.IllegalStateException (" remove" ))
101114 pauseManager.remove(pm)
102115 }
103116 }
@@ -106,9 +119,10 @@ class PauseManagingSharedFlow<T>() : SharedFlow<T> {
106119 fun <T > Flow<T>.sharePauseableIn (
107120 scope : CoroutineScope ,
108121 started : SharingStarted ,
122+ paused : SharingStarted ,
109123 replay : Int = 0
110124 ): SharedFlow <T > {
111- val wrapper = PauseManagingSharedFlow <T >()
125+ val wrapper = PauseManagingSharedFlow <T >(paused )
112126 wrapper.sharedFlow = shareIn(CoroutineScope (scope.coroutineContext + Job (scope.coroutineContext[Job ])
113127 + wrapper.pauseManager), started, replay)
114128 return wrapper
@@ -117,8 +131,8 @@ class PauseManagingSharedFlow<T>() : SharedFlow<T> {
117131}
118132
119133@OptIn(ExperimentalForInheritanceCoroutinesApi ::class )
120- class PauseManagingStateFlow <T >() : StateFlow<T> {
121- private val pauseManager = CountingPauseManager ()
134+ class PauseManagingStateFlow <T >(paused : SharingStarted ) : StateFlow<T> {
135+ private val pauseManager = CountingPauseManager (paused )
122136 lateinit var stateFlow: StateFlow <T >
123137
124138 override val replayCache: List <T >
@@ -143,9 +157,10 @@ class PauseManagingStateFlow<T>() : StateFlow<T> {
143157 fun <T > Flow<T>.statePauseableIn (
144158 scope : CoroutineScope ,
145159 started : SharingStarted ,
160+ paused : SharingStarted ,
146161 initialValue : T
147162 ): SharedFlow <T > {
148- val wrapper = PauseManagingStateFlow <T >()
163+ val wrapper = PauseManagingStateFlow <T >(paused )
149164 wrapper.stateFlow = stateIn(CoroutineScope (scope.coroutineContext + Job (scope.coroutineContext[Job ])
150165 + wrapper.pauseManager), started, initialValue)
151166 return wrapper
@@ -158,14 +173,8 @@ suspend fun <T> repeatFlowWhenUnpaused(enforcePauseable: Boolean = false, block:
158173 val pauseManager = currentCoroutineContext()[PauseManager ]
159174 return if (pauseManager != null ) {
160175 pauseManager.isPaused.flatMapLatest {
161- Log .e(" hi" , " hii ${pauseManager.isPaused.first()} " )
162- if (! it) {
163- try {
164- flowOf(block())
165- } finally {
166- Log .e(" hi" , " byee ${pauseManager.isPaused.first()} " )
167- }
168- }
176+ if (! it)
177+ flowOf(block())
169178 else emptyFlow()
170179 }
171180 } else {
0 commit comments