-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathFlowUseCase.kt
More file actions
139 lines (123 loc) · 4.89 KB
/
FlowUseCase.kt
File metadata and controls
139 lines (123 loc) · 4.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package app.futured.arkitekt.crusecases
import app.futured.arkitekt.crusecases.error.UseCaseErrorHandler
import app.futured.arkitekt.crusecases.scope.CoroutineScopeOwner
import app.futured.arkitekt.crusecases.scope.FlowUseCaseConfig
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.onStart
import kotlin.coroutines.cancellation.CancellationException
/**
* Base [Flow] use case meant to use in [CoroutineScopeOwner] implementations
*/
abstract class FlowUseCase<ARGS, T> {
/**
* [Job] used to hold and cancel existing run of this use case
*/
var job: Job? = null
/**
* Function which builds Flow instance based on given arguments
* @param args initial use case arguments
*/
abstract fun build(args: ARGS): Flow<T>
}
/**
* Asynchronously executes use case and consumes data from flow on UI thread.
* By default all previous pending executions are canceled, this can be changed
* by [config]. When suspend function in use case finishes, onComplete is called
* on UI thread. This version is gets initial arguments by [args].
*
* In case that an error is thrown during the execution of [FlowUseCase] then
* [UseCaseErrorHandler.globalOnErrorLogger] is called with the error as an argument.
*
* @param args Arguments used for initial use case initialization.
* @param config [FlowUseCaseConfig] used to process results of internal
* Flow and to set configuration options.
**/
context(coroutineScopeOwner: CoroutineScopeOwner)
fun <ARGS, T : Any?> FlowUseCase<ARGS, T>.execute(
args: ARGS,
config: FlowUseCaseConfig.Builder<T, T>.() -> Unit,
) {
val flowUseCaseConfig = FlowUseCaseConfig.Builder<T, T>().run {
config.invoke(this)
return@run build()
}
if (flowUseCaseConfig.disposePrevious) {
job?.cancel()
}
job = build(args)
.flowOn(coroutineScopeOwner.getWorkerDispatcher())
.onStart { flowUseCaseConfig.onStart() }
.onEach { flowUseCaseConfig.onNext(it) }
.onCompletion { error ->
when {
error is CancellationException -> {
// ignore this exception
}
error != null -> {
UseCaseErrorHandler.globalOnErrorLogger(error)
flowUseCaseConfig.onError(error)
}
else -> flowUseCaseConfig.onComplete()
}
}
.catch { /* handled in onCompletion */ }
.launchIn(coroutineScopeOwner.viewModelScope)
}
context(coroutineScopeOwner: CoroutineScopeOwner)
fun <T : Any?> FlowUseCase<Unit, T>.execute(config: FlowUseCaseConfig.Builder<T, T>.() -> Unit) =
execute(Unit, config)
context(coroutineScopeOwner: CoroutineScopeOwner)
fun <T : Any?, M : Any?> FlowUseCase<Unit, T>.executeMapped(config: FlowUseCaseConfig.Builder<T, M>.() -> Unit) =
executeMapped(Unit, config)
/**
* Asynchronously executes use case and consumes data from flow on UI thread.
* By default all previous pending executions are canceled, this can be changed
* by [config]. When suspend function in use case finishes, onComplete is called
* on UI thread. This version is gets initial arguments by [args].
*
* In case that an error is thrown during the execution of [FlowUseCase] then
* [UseCaseErrorHandler.globalOnErrorLogger] is called with the error as an argument.
*
* @param args Arguments used for initial use case initialization.
* @param config [FlowUseCaseConfig] used to process results of internal
* Flow and to set configuration options.
**/
context(coroutineScopeOwner: CoroutineScopeOwner)
fun <ARGS, T : Any?, M : Any?> FlowUseCase<ARGS, T>.executeMapped(
args: ARGS,
config: FlowUseCaseConfig.Builder<T, M>.() -> Unit,
) {
val flowUseCaseConfig = FlowUseCaseConfig.Builder<T, M>().run {
config.invoke(this)
return@run build()
}
if (flowUseCaseConfig.disposePrevious) {
job?.cancel()
}
job = build(args)
.flowOn(coroutineScopeOwner.getWorkerDispatcher())
.onStart { flowUseCaseConfig.onStart() }
.mapNotNull { flowUseCaseConfig.onMap?.invoke(it) }
.onEach { flowUseCaseConfig.onNext(it) }
.onCompletion { error ->
when {
error is CancellationException -> {
// ignore this exception
}
error != null -> {
UseCaseErrorHandler.globalOnErrorLogger(error)
flowUseCaseConfig.onError(error)
}
else -> flowUseCaseConfig.onComplete()
}
}
.catch { /* handled in onCompletion */ }
.launchIn(coroutineScopeOwner.viewModelScope)
}