Skip to content

Commit d102823

Browse files
committed
Move code from the JVM to common (naively, will not compile)
1 parent 5e86c9f commit d102823

12 files changed

+873
-845
lines changed

Diff for: kotlinx-coroutines-core/common/src/Builders.common.kt

+120
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,126 @@ internal expect class UndispatchedCoroutine<in T>(
211211
uCont: Continuation<T>
212212
) : ScopeCoroutine<T>
213213

214+
// Used by withContext when context changes, but dispatcher stays the same
215+
internal actual class UndispatchedCoroutine<in T>actual constructor (
216+
context: CoroutineContext,
217+
uCont: Continuation<T>
218+
) : ScopeCoroutine<T>(if (context[UndispatchedMarker] == null) context + UndispatchedMarker else context, uCont) {
219+
220+
/**
221+
* The state of [ThreadContextElement]s associated with the current undispatched coroutine.
222+
* It is stored in a thread local because this coroutine can be used concurrently in suspend-resume race scenario.
223+
* See the followin, boiled down example with inlined `withContinuationContext` body:
224+
* ```
225+
* val state = saveThreadContext(ctx)
226+
* try {
227+
* invokeSmthWithThisCoroutineAsCompletion() // Completion implies that 'afterResume' will be called
228+
* // COROUTINE_SUSPENDED is returned
229+
* } finally {
230+
* thisCoroutine().clearThreadContext() // Concurrently the "smth" could've been already resumed on a different thread
231+
* // and it also calls saveThreadContext and clearThreadContext
232+
* }
233+
* ```
234+
*
235+
* Usage note:
236+
*
237+
* This part of the code is performance-sensitive.
238+
* It is a well-established pattern to wrap various activities into system-specific undispatched
239+
* `withContext` for the sake of logging, MDC, tracing etc., meaning that there exists thousands of
240+
* undispatched coroutines.
241+
* Each access to Java's [ThreadLocal] leaves a footprint in the corresponding Thread's `ThreadLocalMap`
242+
* that is cleared automatically as soon as the associated thread-local (-> UndispatchedCoroutine) is garbage collected
243+
* when either the corresponding thread is GC'ed or it cleans up its stale entries on other TL accesses.
244+
* When such coroutines are promoted to old generation, `ThreadLocalMap`s become bloated and an arbitrary accesses to thread locals
245+
* start to consume significant amount of CPU because these maps are open-addressed and cleaned up incrementally on each access.
246+
* (You can read more about this effect as "GC nepotism").
247+
*
248+
* To avoid that, we attempt to narrow down the lifetime of this thread local as much as possible:
249+
* - It's never accessed when we are sure there are no thread context elements
250+
* - It's cleaned up via [ThreadLocal.remove] as soon as the coroutine is suspended or finished.
251+
*/
252+
private val threadStateToRecover = ThreadLocal<Pair<CoroutineContext, Any?>>()
253+
254+
/*
255+
* Indicates that a coroutine has at least one thread context element associated with it
256+
* and that 'threadStateToRecover' is going to be set in case of dispatchhing in order to preserve them.
257+
* Better than nullable thread-local for easier debugging.
258+
*
259+
* It is used as a performance optimization to avoid 'threadStateToRecover' initialization
260+
* (note: tl.get() initializes thread local),
261+
* and is prone to false-positives as it is never reset: otherwise
262+
* it may lead to logical data races between suspensions point where
263+
* coroutine is yet being suspended in one thread while already being resumed
264+
* in another.
265+
*/
266+
@Volatile
267+
private var threadLocalIsSet = false
268+
269+
init {
270+
/*
271+
* This is a hack for a very specific case in #2930 unless #3253 is implemented.
272+
* 'ThreadLocalStressTest' covers this change properly.
273+
*
274+
* The scenario this change covers is the following:
275+
* 1) The coroutine is being started as plain non kotlinx.coroutines related suspend function,
276+
* e.g. `suspend fun main` or, more importantly, Ktor `SuspendFunGun`, that is invoking
277+
* `withContext(tlElement)` which creates `UndispatchedCoroutine`.
278+
* 2) It (original continuation) is then not wrapped into `DispatchedContinuation` via `intercept()`
279+
* and goes neither through `DC.run` nor through `resumeUndispatchedWith` that both
280+
* do thread context element tracking.
281+
* 3) So thread locals never got chance to get properly set up via `saveThreadContext`,
282+
* but when `withContext` finishes, it attempts to recover thread locals in its `afterResume`.
283+
*
284+
* Here we detect precisely this situation and properly setup context to recover later.
285+
*
286+
*/
287+
if (uCont.context[ContinuationInterceptor] !is CoroutineDispatcher) {
288+
/*
289+
* We cannot just "read" the elements as there is no such API,
290+
* so we update-restore it immediately and use the intermediate value
291+
* as the initial state, leveraging the fact that thread context element
292+
* is idempotent and such situations are increasingly rare.
293+
*/
294+
val values = updateThreadContext(context, null)
295+
restoreThreadContext(context, values)
296+
saveThreadContext(context, values)
297+
}
298+
}
299+
300+
fun saveThreadContext(context: CoroutineContext, oldValue: Any?) {
301+
threadLocalIsSet = true // Specify that thread-local is touched at all
302+
threadStateToRecover.set(context to oldValue)
303+
}
304+
305+
fun clearThreadContext(): Boolean {
306+
return !(threadLocalIsSet && threadStateToRecover.get() == null).also {
307+
threadStateToRecover.remove()
308+
}
309+
}
310+
311+
override fun afterCompletionUndispatched() {
312+
clearThreadLocal()
313+
}
314+
315+
override fun afterResume(state: Any?) {
316+
clearThreadLocal()
317+
// resume undispatched -- update context but stay on the same dispatcher
318+
val result = recoverResult(state, uCont)
319+
withContinuationContext(uCont, null) {
320+
uCont.resumeWith(result)
321+
}
322+
}
323+
324+
private fun clearThreadLocal() {
325+
if (threadLocalIsSet) {
326+
threadStateToRecover.get()?.let { (ctx, value) ->
327+
restoreThreadContext(ctx, value)
328+
}
329+
threadStateToRecover.remove()
330+
}
331+
}
332+
}
333+
214334
private const val UNDECIDED = 0
215335
private const val SUSPENDED = 1
216336
private const val RESUMED = 2

Diff for: kotlinx-coroutines-core/common/src/CoroutineContext.common.kt

+74
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package kotlinx.coroutines
22

3+
import kotlinx.coroutines.internal.*
34
import kotlin.coroutines.*
45

56
/**
@@ -25,3 +26,76 @@ internal expect inline fun <T> withCoroutineContext(context: CoroutineContext, c
2526
internal expect inline fun <T> withContinuationContext(continuation: Continuation<*>, countOrElement: Any?, block: () -> T): T
2627
internal expect fun Continuation<*>.toDebugString(): String
2728
internal expect val CoroutineContext.coroutineName: String?
29+
30+
/**
31+
* Executes a block using a given coroutine context.
32+
*/
33+
internal actual inline fun <T> withCoroutineContext(context: CoroutineContext, countOrElement: Any?, block: () -> T): T {
34+
val oldValue = updateThreadContext(context, countOrElement)
35+
try {
36+
return block()
37+
} finally {
38+
restoreThreadContext(context, oldValue)
39+
}
40+
}
41+
42+
/**
43+
* Executes a block using a context of a given continuation.
44+
*/
45+
internal actual inline fun <T> withContinuationContext(continuation: Continuation<*>, countOrElement: Any?, block: () -> T): T {
46+
val context = continuation.context
47+
val oldValue = updateThreadContext(context, countOrElement)
48+
val undispatchedCompletion = if (oldValue !== NO_THREAD_ELEMENTS) {
49+
// Only if some values were replaced we'll go to the slow path of figuring out where/how to restore them
50+
continuation.updateUndispatchedCompletion(context, oldValue)
51+
} else {
52+
null // fast path -- don't even try to find undispatchedCompletion as there's nothing to restore in the context
53+
}
54+
try {
55+
return block()
56+
} finally {
57+
if (undispatchedCompletion == null || undispatchedCompletion.clearThreadContext()) {
58+
restoreThreadContext(context, oldValue)
59+
}
60+
}
61+
}
62+
63+
internal fun Continuation<*>.updateUndispatchedCompletion(context: CoroutineContext, oldValue: Any?): UndispatchedCoroutine<*>? {
64+
if (this !is CoroutineStackFrame) return null
65+
/*
66+
* Fast-path to detect whether we have undispatched coroutine at all in our stack.
67+
*
68+
* Implementation note.
69+
* If we ever find that stackwalking for thread-locals is way too slow, here is another idea:
70+
* 1) Store undispatched coroutine right in the `UndispatchedMarker` instance
71+
* 2) To avoid issues with cross-dispatch boundary, remove `UndispatchedMarker`
72+
* from the context when creating dispatched coroutine in `withContext`.
73+
* Another option is to "unmark it" instead of removing to save an allocation.
74+
* Both options should work, but it requires more careful studying of the performance
75+
* and, mostly, maintainability impact.
76+
*/
77+
val potentiallyHasUndispatchedCoroutine = context[UndispatchedMarker] !== null
78+
if (!potentiallyHasUndispatchedCoroutine) return null
79+
val completion = undispatchedCompletion()
80+
completion?.saveThreadContext(context, oldValue)
81+
return completion
82+
}
83+
84+
internal tailrec fun CoroutineStackFrame.undispatchedCompletion(): UndispatchedCoroutine<*>? {
85+
// Find direct completion of this continuation
86+
val completion: CoroutineStackFrame = when (this) {
87+
is DispatchedCoroutine<*> -> return null
88+
else -> callerFrame ?: return null // something else -- not supported
89+
}
90+
if (completion is UndispatchedCoroutine<*>) return completion // found UndispatchedCoroutine!
91+
return completion.undispatchedCompletion() // walk up the call stack with tail call
92+
}
93+
94+
/**
95+
* Marker indicating that [UndispatchedCoroutine] exists somewhere up in the stack.
96+
* Used as a performance optimization to avoid stack walking where it is not necessary.
97+
*/
98+
private object UndispatchedMarker: CoroutineContext.Element, CoroutineContext.Key<UndispatchedMarker> {
99+
override val key: CoroutineContext.Key<*>
100+
get() = this
101+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package kotlinx.coroutines
2+
3+
import kotlin.coroutines.CoroutineContext
4+
import kotlin.coroutines.EmptyCoroutineContext
5+
6+
7+
/**
8+
* Creates a context for a new coroutine. It installs [Dispatchers.Default] when no other dispatcher or
9+
* [ContinuationInterceptor] is specified and adds optional support for debugging facilities (when turned on)
10+
* and copyable-thread-local facilities on JVM.
11+
* See [DEBUG_PROPERTY_NAME] for description of debugging facilities on JVM.
12+
*/
13+
@ExperimentalCoroutinesApi
14+
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
15+
val combined = foldCopies(coroutineContext, context, true)
16+
val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
17+
return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
18+
debug + Dispatchers.Default else debug
19+
}
20+
21+
/**
22+
* Creates a context for coroutine builder functions that do not launch a new coroutine, e.g. [withContext].
23+
* @suppress
24+
*/
25+
@InternalCoroutinesApi
26+
public actual fun CoroutineContext.newCoroutineContext(addedContext: CoroutineContext): CoroutineContext {
27+
/*
28+
* Fast-path: we only have to copy/merge if 'addedContext' (which typically has one or two elements)
29+
* contains copyable elements.
30+
*/
31+
if (!addedContext.hasCopyableElements()) return this + addedContext
32+
return foldCopies(this, addedContext, false)
33+
}
34+
35+
private fun CoroutineContext.hasCopyableElements(): Boolean =
36+
fold(false) { result, it -> result || it is CopyableThreadContextElement<*> }
37+
38+
/**
39+
* Folds two contexts properly applying [CopyableThreadContextElement] rules when necessary.
40+
* The rules are the following:
41+
* - If neither context has CTCE, the sum of two contexts is returned
42+
* - Every CTCE from the left-hand side context that does not have a matching (by key) element from right-hand side context
43+
* is [copied][CopyableThreadContextElement.copyForChild] if [isNewCoroutine] is `true`.
44+
* - Every CTCE from the left-hand side context that has a matching element in the right-hand side context is [merged][CopyableThreadContextElement.mergeForChild]
45+
* - Every CTCE from the right-hand side context that hasn't been merged is copied
46+
* - Everything else is added to the resulting context as is.
47+
*/
48+
private fun foldCopies(originalContext: CoroutineContext, appendContext: CoroutineContext, isNewCoroutine: Boolean): CoroutineContext {
49+
// Do we have something to copy left-hand side?
50+
val hasElementsLeft = originalContext.hasCopyableElements()
51+
val hasElementsRight = appendContext.hasCopyableElements()
52+
53+
// Nothing to fold, so just return the sum of contexts
54+
if (!hasElementsLeft && !hasElementsRight) {
55+
return originalContext + appendContext
56+
}
57+
58+
var leftoverContext = appendContext
59+
val folded = originalContext.fold<CoroutineContext>(EmptyCoroutineContext) { result, element ->
60+
if (element !is CopyableThreadContextElement<*>) return@fold result + element
61+
// Will this element be overwritten?
62+
val newElement = leftoverContext[element.key]
63+
// No, just copy it
64+
if (newElement == null) {
65+
// For 'withContext'-like builders we do not copy as the element is not shared
66+
return@fold result + if (isNewCoroutine) element.copyForChild() else element
67+
}
68+
// Yes, then first remove the element from append context
69+
leftoverContext = leftoverContext.minusKey(element.key)
70+
// Return the sum
71+
@Suppress("UNCHECKED_CAST")
72+
return@fold result + (element as CopyableThreadContextElement<Any?>).mergeForChild(newElement)
73+
}
74+
75+
if (hasElementsRight) {
76+
leftoverContext = leftoverContext.fold<CoroutineContext>(EmptyCoroutineContext) { result, element ->
77+
// We're appending new context element -- we have to copy it, otherwise it may be shared with others
78+
if (element is CopyableThreadContextElement<*>) {
79+
return@fold result + element.copyForChild()
80+
}
81+
return@fold result + element
82+
}
83+
}
84+
return folded + leftoverContext
85+
}

0 commit comments

Comments
 (0)