Skip to content

Commit e1e1c3d

Browse files
committed
Fix a flaky test based on #4204
1 parent 7b9c927 commit e1e1c3d

File tree

3 files changed

+78
-117
lines changed

3 files changed

+78
-117
lines changed

Diff for: kotlinx-coroutines-core/common/test/ThreadContextElementTest.kt

-36
Original file line numberDiff line numberDiff line change
@@ -75,42 +75,6 @@ class ThreadContextElementTest : TestBase() {
7575
job.join()
7676
assertNull(threadContextElementThreadLocal.get())
7777
}
78-
79-
@Test
80-
fun testWithContextJobAccess() = runTest {
81-
val captor = JobCaptor()
82-
val manuallyCaptured = ArrayList<Job>()
83-
withContext(captor) {
84-
manuallyCaptured += coroutineContext.job
85-
withContext(CoroutineName("undispatched")) {
86-
manuallyCaptured += coroutineContext.job
87-
withContext(Dispatchers.Default) {
88-
manuallyCaptured += coroutineContext.job
89-
}
90-
// Context restored, captured again
91-
manuallyCaptured += coroutineContext.job
92-
}
93-
// Context restored, captured again
94-
manuallyCaptured += coroutineContext.job
95-
}
96-
assertEquals(manuallyCaptured, captor.capturees)
97-
}
98-
}
99-
100-
private class JobCaptor() : ThreadContextElement<Unit> {
101-
102-
val capturees: MutableList<Job> = mutableListOf()
103-
104-
companion object Key : CoroutineContext.Key<MyElement>
105-
106-
override val key: CoroutineContext.Key<*> get() = Key
107-
108-
override fun updateThreadContext(context: CoroutineContext) {
109-
capturees.add(context.job)
110-
}
111-
112-
override fun restoreThreadContext(context: CoroutineContext, oldState: Unit) {
113-
}
11478
}
11579

11680
internal class MyData

Diff for: kotlinx-coroutines-core/concurrent/test/ThreadContextElementConcurrentTest.kt

+78
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package kotlinx.coroutines
22

33
import kotlinx.coroutines.testing.*
4+
import kotlin.coroutines.CoroutineContext
45
import kotlin.test.*
56

67
class ThreadContextElementConcurrentTest : TestBase() {
@@ -56,4 +57,81 @@ class ThreadContextElementConcurrentTest : TestBase() {
5657
" ThreadContextElement that did not override `copyForChildCoroutine()`"
5758
)
5859
}
60+
61+
62+
63+
class JobCaptor(val capturees: MutableList<String> = mutableListOf()) : ThreadContextElement<Unit> {
64+
65+
companion object Key : CoroutineContext.Key<MyElement>
66+
67+
override val key: CoroutineContext.Key<*> get() = Key
68+
69+
override fun updateThreadContext(context: CoroutineContext) {
70+
capturees.add("Update: ${context.job}")
71+
}
72+
73+
override fun restoreThreadContext(context: CoroutineContext, oldState: Unit) {
74+
capturees.add("Restore: ${context.job}")
75+
}
76+
}
77+
78+
/**
79+
* For stability of the test, it is important to make sure that
80+
* the parent job actually suspends when calling
81+
* `withContext(dispatcher2 + CoroutineName("dispatched"))`.
82+
*
83+
* Here this requirement is fulfilled by forcing execution on a single thread.
84+
* However, dispatching is performed with two non-equal dispatchers to force dispatching.
85+
*
86+
* Suspend of the parent coroutine [kotlinx.coroutines.DispatchedCoroutine.trySuspend] is out of the control of the test,
87+
* while being executed concurrently with resume of the child coroutine [kotlinx.coroutines.DispatchedCoroutine.tryResume].
88+
*/
89+
@Test
90+
fun testWithContextJobAccess() = runTest {
91+
// Emulate non-equal dispatchers
92+
newFixedThreadPoolContext(1, "testWithContextJobAccess").use { executor ->
93+
val dispatcher1 = executor.limitedParallelism(1, "dispatcher1")
94+
val dispatcher2 = executor.limitedParallelism(1, "dispatcher2")
95+
val captor = JobCaptor()
96+
val manuallyCaptured = mutableListOf<String>()
97+
98+
fun registerUpdate(job: Job?) = manuallyCaptured.add("Update: $job")
99+
fun registerRestore(job: Job?) = manuallyCaptured.add("Restore: $job")
100+
101+
var rootJob: Job? = null
102+
withContext(captor + dispatcher1) {
103+
rootJob = coroutineContext.job
104+
registerUpdate(rootJob)
105+
var undispatchedJob: Job? = null
106+
withContext(CoroutineName("undispatched")) {
107+
undispatchedJob = coroutineContext.job
108+
registerUpdate(undispatchedJob)
109+
// These 2 restores and the corresponding next 2 updates happen only if the following `withContext`
110+
// call actually suspends.
111+
registerRestore(undispatchedJob)
112+
registerRestore(rootJob)
113+
// Without forcing of single backing thread the code inside `withContext`
114+
// may already complete at the moment when the parent coroutine decides
115+
// whether it needs to suspend or not.
116+
var dispatchedJob: Job? = null
117+
withContext(dispatcher2 + CoroutineName("dispatched")) {
118+
dispatchedJob = coroutineContext.job
119+
registerUpdate(dispatchedJob)
120+
}
121+
registerRestore(dispatchedJob)
122+
// Context restored, captured again
123+
registerUpdate(undispatchedJob)
124+
}
125+
registerRestore(undispatchedJob)
126+
// Context restored, captured again
127+
registerUpdate(rootJob)
128+
}
129+
registerRestore(rootJob)
130+
131+
// Restores may be called concurrently to the update calls in other threads, so their order is not checked.
132+
val expected = manuallyCaptured.filter { it.startsWith("Update: ") }.joinToString(separator = "\n")
133+
val actual = captor.capturees.filter { it.startsWith("Update: ") }.joinToString(separator = "\n")
134+
assertEquals(expected, actual)
135+
}
136+
}
59137
}

Diff for: kotlinx-coroutines-core/jvm/test/ThreadContextElementJvmTest.kt

-81
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,6 @@ package kotlinx.coroutines
33
import kotlinx.coroutines.flow.*
44
import kotlinx.coroutines.testing.*
55
import org.junit.Test
6-
import java.util.concurrent.CopyOnWriteArrayList
7-
import java.util.concurrent.ExecutorService
8-
import java.util.concurrent.Executors
96
import kotlin.coroutines.*
107
import kotlin.test.*
118

@@ -63,83 +60,6 @@ class ThreadContextElementJvmTest : TestBase() {
6360
}
6461
}
6562

66-
class JobCaptor(val capturees: MutableList<String> = CopyOnWriteArrayList()) : ThreadContextElement<Unit> {
67-
68-
companion object Key : CoroutineContext.Key<MyElement>
69-
70-
override val key: CoroutineContext.Key<*> get() = Key
71-
72-
override fun updateThreadContext(context: CoroutineContext) {
73-
capturees.add("Update: ${context.job}")
74-
}
75-
76-
override fun restoreThreadContext(context: CoroutineContext, oldState: Unit) {
77-
capturees.add("Restore: ${context.job}")
78-
}
79-
}
80-
81-
/**
82-
* For stability of the test, it is important to make sure that
83-
* the parent job actually suspends when calling
84-
* `withContext(dispatcher2 + CoroutineName("dispatched"))`.
85-
*
86-
* Here this requirement is fulfilled by forcing execution on a single thread.
87-
* However, dispatching is performed with two non-equal dispatchers to force dispatching.
88-
*
89-
* Suspend of the parent coroutine [kotlinx.coroutines.DispatchedCoroutine.trySuspend] is out of the control of the test,
90-
* while being executed concurrently with resume of the child coroutine [kotlinx.coroutines.DispatchedCoroutine.tryResume].
91-
*/
92-
@Test
93-
fun testWithContextJobAccess() = runTest {
94-
val executor = Executors.newSingleThreadExecutor()
95-
// Emulate non-equal dispatchers
96-
val executor1 = object : ExecutorService by executor {}
97-
val executor2 = object : ExecutorService by executor {}
98-
val dispatcher1 = executor1.asCoroutineDispatcher()
99-
val dispatcher2 = executor2.asCoroutineDispatcher()
100-
val captor = JobCaptor()
101-
val manuallyCaptured = mutableListOf<String>()
102-
103-
fun registerUpdate(job: Job?) = manuallyCaptured.add("Update: $job")
104-
fun registerRestore(job: Job?) = manuallyCaptured.add("Restore: $job")
105-
106-
var rootJob: Job? = null
107-
runBlocking(captor + dispatcher1) {
108-
rootJob = coroutineContext.job
109-
registerUpdate(rootJob)
110-
var undispatchedJob: Job? = null
111-
withContext(CoroutineName("undispatched")) {
112-
undispatchedJob = coroutineContext.job
113-
registerUpdate(undispatchedJob)
114-
// These 2 restores and the corresponding next 2 updates happen only if the following `withContext`
115-
// call actually suspends.
116-
registerRestore(undispatchedJob)
117-
registerRestore(rootJob)
118-
// Without forcing of single backing thread the code inside `withContext`
119-
// may already complete at the moment when the parent coroutine decides
120-
// whether it needs to suspend or not.
121-
var dispatchedJob: Job? = null
122-
withContext(dispatcher2 + CoroutineName("dispatched")) {
123-
dispatchedJob = coroutineContext.job
124-
registerUpdate(dispatchedJob)
125-
}
126-
registerRestore(dispatchedJob)
127-
// Context restored, captured again
128-
registerUpdate(undispatchedJob)
129-
}
130-
registerRestore(undispatchedJob)
131-
// Context restored, captured again
132-
registerUpdate(rootJob)
133-
}
134-
registerRestore(rootJob)
135-
136-
// Restores may be called concurrently to the update calls in other threads, so their order is not checked.
137-
val expected = manuallyCaptured.filter { it.startsWith("Update: ") }.joinToString(separator = "\n")
138-
val actual = captor.capturees.filter { it.startsWith("Update: ") }.joinToString(separator = "\n")
139-
assertEquals(expected, actual)
140-
executor.shutdownNow()
141-
}
142-
14363
@Test
14464
fun testThreadLocalFlowOn() = runTest {
14565
val myData = MyData()
@@ -216,4 +136,3 @@ private inline fun <ThreadLocalT, OutputT> ThreadLocal<ThreadLocalT>.setForBlock
216136
block()
217137
set(priorValue)
218138
}
219-

0 commit comments

Comments
 (0)