1
+ /*
2
+ * Copyright 2016-2025 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3
+ */
4
+
5
+ package kotlinx.coroutines.gpmc
6
+
7
+ import junit.framework.TestCase.assertEquals
8
+ import kotlinx.coroutines.CoroutineDispatcher
9
+ import kotlinx.coroutines.DispatchException
10
+ import kotlinx.coroutines.testing.TestException
11
+ import org.jetbrains.kotlinx.lincheck.ExperimentalModelCheckingAPI
12
+ import org.jetbrains.kotlinx.lincheck.runConcurrentTest
13
+ import org.junit.*
14
+ import kotlin.coroutines.CoroutineContext
15
+ import kotlin.coroutines.EmptyCoroutineContext
16
+
17
+ @OptIn(ExperimentalModelCheckingAPI ::class )
18
+ class ReproducerGpmc {
19
+
20
+ /* *
21
+ * Checks that even if the dispatcher sporadically fails, the limited dispatcher will still allow reaching the
22
+ * target parallelism level.
23
+ */
24
+ @Test
25
+ fun testLimitedParallelismOfOccasionallyFailingDispatcher () {
26
+ val limit = 5
27
+ var doFail = false
28
+ val workerQueue = mutableListOf<Runnable >()
29
+ val limited = object : CoroutineDispatcher () {
30
+ override fun dispatch (context : CoroutineContext , block : Runnable ) {
31
+ if (doFail) throw TestException ()
32
+ workerQueue.add(block)
33
+ }
34
+ }.limitedParallelism(limit)
35
+ repeat(6 * limit) {
36
+ try {
37
+ limited.dispatch(EmptyCoroutineContext , Runnable { /* do nothing */ })
38
+ } catch (_: DispatchException ) {
39
+ // ignore
40
+ }
41
+ doFail = ! doFail
42
+ }
43
+ assertEquals(limit, workerQueue.size)
44
+ }
45
+
46
+ var iter = 0
47
+
48
+ @Test
49
+ fun testGpmc () {
50
+ runConcurrentTest {
51
+ println (" Iteration: ${iter++ } " )
52
+ val limit = 5
53
+ var doFail = false
54
+ val workerQueue = mutableListOf<Runnable >()
55
+ val limited = object : CoroutineDispatcher () {
56
+ override fun dispatch (context : CoroutineContext , block : kotlinx.coroutines.Runnable ) {
57
+ if (doFail) throw TestException ()
58
+ workerQueue.add(block)
59
+ }
60
+ }.limitedParallelism(limit)
61
+ repeat(limit) {
62
+ try {
63
+ limited.dispatch(EmptyCoroutineContext , Runnable { /* do nothing */ })
64
+ } catch (e: DispatchException ) {
65
+ // ignore
66
+ println (" Ignoring: ${e.message} : $e " )
67
+ }
68
+ doFail = ! doFail
69
+ }
70
+ assertEquals(limit, workerQueue.size)
71
+ }
72
+ }
73
+ }
0 commit comments