-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
Copy pathCoroutineDispatcher.kt
232 lines (219 loc) · 13.1 KB
/
CoroutineDispatcher.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
package kotlinx.coroutines
import kotlinx.coroutines.internal.*
import kotlin.coroutines.*
/**
* Base class to be extended by all coroutine dispatcher implementations.
*
* The following standard implementations are provided by `kotlinx.coroutines` as properties on
* the [Dispatchers] object:
*
* - [Dispatchers.Default] — is used by all standard builders if no dispatcher or any other [ContinuationInterceptor]
* is specified in their context. It uses a common pool of shared background threads.
* This is an appropriate choice for compute-intensive coroutines that consume CPU resources.
* - [Dispatchers.IO] — uses a shared pool of on-demand created threads and is designed for offloading of IO-intensive _blocking_
* operations (like file I/O and blocking socket I/O).
* - [Dispatchers.Unconfined] — starts coroutine execution in the current call-frame until the first suspension,
* whereupon the coroutine builder function returns.
* The coroutine will later resume in whatever thread used by the
* corresponding suspending function, without confining it to any specific thread or pool.
* **The `Unconfined` dispatcher should not normally be used in code**.
* - Private thread pools can be created with [newSingleThreadContext] and [newFixedThreadPoolContext].
* - An arbitrary [Executor][java.util.concurrent.Executor] can be converted to a dispatcher with the [asCoroutineDispatcher] extension function.
*
* This class ensures that debugging facilities in [newCoroutineContext] function work properly.
*/
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
/** @suppress */
@ExperimentalStdlibApi
public companion object Key : AbstractCoroutineContextKey<ContinuationInterceptor, CoroutineDispatcher>(
ContinuationInterceptor,
{ it as? CoroutineDispatcher })
/**
* Returns `true` if the execution of the coroutine should be performed with [dispatch] method.
* The default behavior for most dispatchers is to return `true`.
*
* If this method returns `false`, the coroutine is resumed immediately in the current thread,
* potentially forming an event-loop to prevent stack overflows.
* The event loop is an advanced topic and its implications can be found in [Dispatchers.Unconfined] documentation.
*
* The [context] parameter represents the context of the coroutine that is being dispatched,
* or [EmptyCoroutineContext] if a non-coroutine-specific [Runnable] is dispatched instead.
*
* A dispatcher can override this method to provide a performance optimization and avoid paying a cost of an unnecessary dispatch.
* E.g. [MainCoroutineDispatcher.immediate] checks whether we are already in the required UI thread in this method and avoids
* an additional dispatch when it is not required.
*
* While this approach can be more efficient, it is not chosen by default to provide a consistent dispatching behaviour
* so that users won't observe unexpected and non-consistent order of events by default.
*
* Coroutine builders like [launch][CoroutineScope.launch] and [async][CoroutineScope.async] accept an optional [CoroutineStart]
* parameter that allows one to optionally choose the [undispatched][CoroutineStart.UNDISPATCHED] behavior to start coroutine immediately,
* but to be resumed only in the provided dispatcher.
*
* This method should generally be exception-safe. An exception thrown from this method
* may leave the coroutines that use this dispatcher in the inconsistent and hard to debug state.
*
* @see dispatch
* @see Dispatchers.Unconfined
*/
public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
/**
* Creates a view of the current dispatcher that limits the parallelism to the given [value][parallelism].
* The resulting view uses the original dispatcher for execution but with the guarantee that
* no more than [parallelism] coroutines are executed at the same time.
*
* This method does not impose restrictions on the number of views or the total sum of parallelism values,
* each view controls its own parallelism independently with the guarantee that the effective parallelism
* of all views cannot exceed the actual parallelism of the original dispatcher.
*
* The resulting dispatcher does not guarantee that the coroutines will always be dispatched on the same
* subset of threads, it only guarantees that at most [parallelism] coroutines are executed at the same time,
* and reuses threads from the original dispatchers.
* It does not constitute a resource -- it is a _view_ of the underlying dispatcher that can be thrown away
* and is not required to be closed.
*
* ### Example of usage
* ```
* // Background dispatcher for the application
* val dispatcher = newFixedThreadPoolContext(4, "App Background")
* // At most 2 threads will be processing images as it is really slow and CPU-intensive
* val imageProcessingDispatcher = dispatcher.limitedParallelism(2, "Image processor")
* // At most 3 threads will be processing JSON to avoid image processing starvation
* val jsonProcessingDispatcher = dispatcher.limitedParallelism(3, "Json processor")
* // At most 1 thread will be doing IO
* val fileWriterDispatcher = dispatcher.limitedParallelism(1, "File writer")
* ```
* Note how in this example the application has an executor with 4 threads, but the total sum of all limits
* is 6. Still, at most 4 coroutines can be executed simultaneously as each view limits only its own parallelism,
* and at most 4 threads can exist in the system.
*
* Note that this example was structured in such a way that it illustrates the parallelism guarantees.
* In practice, it is usually better to use `Dispatchers.IO` or [Dispatchers.Default] instead of creating a
* `backgroundDispatcher`.
*
* ### `limitedParallelism(1)` pattern
*
* One of the common patterns is confining the execution of specific tasks to a sequential execution in background
* with `limitedParallelism(1)` invocation.
* For that purpose, the implementation guarantees that tasks are executed sequentially and that a happens-before relation
* is established between them:
*
* ```
* val confined = Dispatchers.Default.limitedParallelism(1, "incrementDispatcher")
* var counter = 0
*
* // Invoked from arbitrary coroutines
* launch(confined) {
* // This increment is sequential and race-free
* ++counter
* }
* ```
* Note that there is no guarantee that the underlying system thread will always be the same.
*
* ### Dispatchers.IO
*
* `Dispatcher.IO` is considered _elastic_ for the purposes of limited parallelism -- the sum of
* views is not restricted by the capacity of `Dispatchers.IO`.
* It means that it is safe to replace `newFixedThreadPoolContext(nThreads)` with
* `Dispatchers.IO.limitedParallelism(nThreads)` w.r.t. available number of threads.
* See `Dispatchers.IO` documentation for more details.
*
* ### Restrictions and implementation details
*
* The default implementation of `limitedParallelism` does not support direct dispatchers,
* such as executing the given runnable in place during [dispatch] calls.
* Any dispatcher that may return `false` from [isDispatchNeeded] is considered direct.
* For direct dispatchers, it is recommended to override this method
* and provide a domain-specific implementation or to throw an [UnsupportedOperationException].
*
* Implementations of this method are allowed to return `this` if the current dispatcher already satisfies the parallelism requirement.
* For example, `Dispatchers.Main.limitedParallelism(1)` returns `Dispatchers.Main`, because the main dispatcher is already single-threaded.
*
* @param name optional name for the resulting dispatcher string representation if a new dispatcher was created.
* Implementations are free to ignore this parameter.
* @throws IllegalArgumentException if the given [parallelism] is non-positive
* @throws UnsupportedOperationException if the current dispatcher does not support limited parallelism views
*/
public open fun limitedParallelism(parallelism: Int, name: String? = null): CoroutineDispatcher {
parallelism.checkParallelism()
return LimitedDispatcher(this, parallelism, name)
}
// Was experimental since 1.6.0, deprecated since 1.8.x
@Deprecated("Deprecated for good. Override 'limitedParallelism(parallelism: Int, name: String?)' instead",
level = DeprecationLevel.HIDDEN,
replaceWith = ReplaceWith("limitedParallelism(parallelism, null)")
)
public open fun limitedParallelism(parallelism: Int): CoroutineDispatcher = limitedParallelism(parallelism, null)
/**
* Requests execution of a runnable [block].
* The dispatcher guarantees that [block] will eventually execute, typically by dispatching it to a thread pool,
* using a dedicated thread, or just executing the block in place.
* The [context] parameter represents the context of the coroutine that is being dispatched,
* or [EmptyCoroutineContext] if a non-coroutine-specific [Runnable] is dispatched instead.
* Implementations may use [context] for additional context-specific information,
* such as priority, whether the dispatched coroutine can be invoked in place,
* coroutine name, and additional diagnostic elements.
*
* This method should guarantee that the given [block] will be eventually invoked,
* otherwise the system may reach a deadlock state and never leave it.
* The cancellation mechanism is transparent for [CoroutineDispatcher] and is managed by [block] internals.
*
* This method should generally be exception-safe. An exception thrown from this method
* may leave the coroutines that use this dispatcher in an inconsistent and hard-to-debug state.
*
* This method must not immediately call [block]. Doing so may result in `StackOverflowError`
* when `dispatch` is invoked repeatedly, for example when [yield] is called in a loop.
* In order to execute a block in place, it is required to return `false` from [isDispatchNeeded]
* and delegate the `dispatch` implementation to `Dispatchers.Unconfined.dispatch` in such cases.
* To support this, the coroutines machinery ensures in-place execution and forms an event-loop to
* avoid unbound recursion.
*
* @see isDispatchNeeded
* @see Dispatchers.Unconfined
*/
public abstract fun dispatch(context: CoroutineContext, block: Runnable)
/**
* Dispatches execution of a runnable `block` onto another thread in the given `context`
* with a hint for the dispatcher that the current dispatch is triggered by a [yield] call, so that the execution of this
* continuation may be delayed in favor of already dispatched coroutines.
*
* Though the `yield` marker may be passed as a part of [context], this
* is a separate method for performance reasons.
*
* @suppress **This an internal API and should not be used from general code.**
*/
@InternalCoroutinesApi
public open fun dispatchYield(context: CoroutineContext, block: Runnable): Unit = dispatch(context, block)
/**
* Returns a continuation that wraps the provided [continuation], thus intercepting all resumptions.
*
* This method should generally be exception-safe. An exception thrown from this method
* may leave the coroutines that use this dispatcher in the inconsistent and hard to debug state.
*/
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
public final override fun releaseInterceptedContinuation(continuation: Continuation<*>) {
/*
* Unconditional cast is safe here: we only return DispatchedContinuation from `interceptContinuation`,
* any ClassCastException can only indicate compiler bug
*/
val dispatched = continuation as DispatchedContinuation<*>
dispatched.release()
}
/**
* @suppress **Error**: Operator '+' on two CoroutineDispatcher objects is meaningless.
* CoroutineDispatcher is a coroutine context element and `+` is a set-sum operator for coroutine contexts.
* The dispatcher to the right of `+` just replaces the dispatcher to the left.
*/
@Suppress("DeprecatedCallableAddReplaceWith")
@Deprecated(
message = "Operator '+' on two CoroutineDispatcher objects is meaningless. " +
"CoroutineDispatcher is a coroutine context element and `+` is a set-sum operator for coroutine contexts. " +
"The dispatcher to the right of `+` just replaces the dispatcher to the left.",
level = DeprecationLevel.ERROR
)
public operator fun plus(other: CoroutineDispatcher): CoroutineDispatcher = other
/** @suppress for nicer debugging */
override fun toString(): String = "$classSimpleName@$hexAddress"
}