Skip to content

Commit 60f9bea

Browse files
committed
Introduce SegmentQueueSynchronizer abstraction for synchronization primitives and ReadWriteMutex
1 parent 7c6c72f commit 60f9bea

16 files changed

+2732
-208
lines changed

Diff for: .idea/dictionaries/shared.xml

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Diff for: benchmarks/src/jmh/kotlin/benchmarks/SemaphoreBenchmark.kt

+5-6
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ import org.openjdk.jmh.annotations.*
1414
import java.util.concurrent.ForkJoinPool
1515
import java.util.concurrent.TimeUnit
1616

17-
@Warmup(iterations = 3, time = 500, timeUnit = TimeUnit.MICROSECONDS)
18-
@Measurement(iterations = 10, time = 500, timeUnit = TimeUnit.MICROSECONDS)
17+
@Warmup(iterations = 2, time = 500, timeUnit = TimeUnit.MICROSECONDS)
18+
@Measurement(iterations = 5, time = 500, timeUnit = TimeUnit.MICROSECONDS)
1919
@Fork(value = 1)
2020
@BenchmarkMode(Mode.AverageTime)
2121
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@@ -31,7 +31,6 @@ open class SemaphoreBenchmark {
3131
private var _3_maxPermits: Int = 0
3232

3333
@Param("1", "2", "4") // local machine
34-
// @Param("1", "2", "4", "8", "16", "32", "64", "128", "144") // dasquad
3534
// @Param("1", "2", "4", "8", "16", "32", "64", "96") // Google Cloud
3635
private var _4_parallelism: Int = 0
3736

@@ -51,7 +50,7 @@ open class SemaphoreBenchmark {
5150
val semaphore = Semaphore(_3_maxPermits)
5251
val jobs = ArrayList<Job>(coroutines)
5352
repeat(coroutines) {
54-
jobs += GlobalScope.launch {
53+
jobs += GlobalScope.launch(dispatcher) {
5554
repeat(n) {
5655
semaphore.withPermit {
5756
doGeomDistrWork(WORK_INSIDE)
@@ -69,7 +68,7 @@ open class SemaphoreBenchmark {
6968
val semaphore = Channel<Unit>(_3_maxPermits)
7069
val jobs = ArrayList<Job>(coroutines)
7170
repeat(coroutines) {
72-
jobs += GlobalScope.launch {
71+
jobs += GlobalScope.launch(dispatcher) {
7372
repeat(n) {
7473
semaphore.send(Unit) // acquire
7574
doGeomDistrWork(WORK_INSIDE)
@@ -89,4 +88,4 @@ enum class SemaphoreBenchDispatcherCreator(val create: (parallelism: Int) -> Cor
8988

9089
private const val WORK_INSIDE = 80
9190
private const val WORK_OUTSIDE = 40
92-
private const val BATCH_SIZE = 1000000
91+
private const val BATCH_SIZE = 1_000_000

Diff for: gradle.properties

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ junit_version=4.12
1212
atomicfu_version=0.15.2
1313
knit_version=0.2.3
1414
html_version=0.6.8
15-
lincheck_version=2.10
15+
lincheck_version=2.11
1616
dokka_version=0.9.16-rdev-2-mpp-hacks
1717
byte_buddy_version=1.10.9
1818
reactor_version=3.4.1

Diff for: kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

+12
Original file line numberDiff line numberDiff line change
@@ -1281,6 +1281,18 @@ public final class kotlinx/coroutines/sync/MutexKt {
12811281
public static synthetic fun withLock$default (Lkotlinx/coroutines/sync/Mutex;Ljava/lang/Object;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
12821282
}
12831283

1284+
public abstract interface class kotlinx/coroutines/sync/ReadWriteMutex {
1285+
public abstract fun getWrite ()Lkotlinx/coroutines/sync/Mutex;
1286+
public abstract fun readLock (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
1287+
public abstract fun readUnlock ()V
1288+
}
1289+
1290+
public final class kotlinx/coroutines/sync/ReadWriteMutexKt {
1291+
public static final fun ReadWriteMutex ()Lkotlinx/coroutines/sync/ReadWriteMutex;
1292+
public static final fun read (Lkotlinx/coroutines/sync/ReadWriteMutex;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
1293+
public static final fun write (Lkotlinx/coroutines/sync/ReadWriteMutex;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
1294+
}
1295+
12841296
public abstract interface class kotlinx/coroutines/sync/Semaphore {
12851297
public abstract fun acquire (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
12861298
public abstract fun getAvailablePermits ()I

Diff for: kotlinx-coroutines-core/build.gradle

+2-2
Original file line numberDiff line numberDiff line change
@@ -242,8 +242,8 @@ static void configureJvmForLincheck(task) {
242242
task.maxHeapSize = '6g' // we may need more space for building an interleaving tree in the model checking mode
243243
task.jvmArgs = ['--add-opens', 'java.base/jdk.internal.misc=ALL-UNNAMED', // required for transformation
244244
'--add-exports', 'java.base/jdk.internal.util=ALL-UNNAMED'] // in the model checking mode
245-
task.systemProperty 'kotlinx.coroutines.semaphore.segmentSize', '2'
246-
task.systemProperty 'kotlinx.coroutines.semaphore.maxSpinCycles', '1' // better for the model checking mode
245+
task.systemProperty 'kotlinx.coroutines.sqs.segmentSize', '2'
246+
task.systemProperty 'kotlinx.coroutines.sqs.maxSpinCycles', '1' // better for the model checking mode
247247
}
248248

249249
task jdk16Test(type: Test, dependsOn: [compileTestKotlinJvm, checkJdk16]) {

Diff for: kotlinx-coroutines-core/common/src/Debug.common.kt

+1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ internal expect val DEBUG: Boolean
88
internal expect val Any.hexAddress: String
99
internal expect val Any.classSimpleName: String
1010
internal expect fun assert(value: () -> Boolean)
11+
internal inline fun assertNot(crossinline value: () -> Boolean) = assert { !value() }
1112

1213
/**
1314
* Throwable which can be cloned during stacktrace recovery in a class-specific way.

Diff for: kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt

+13-10
Original file line numberDiff line numberDiff line change
@@ -149,11 +149,13 @@ internal abstract class ConcurrentLinkedListNode<N : ConcurrentLinkedListNode<N>
149149
*/
150150
fun remove() {
151151
assert { removed } // The node should be logically removed at first.
152-
assert { !isTail } // The physical tail cannot be removed.
152+
// The physical tail cannot be removed. Instead, we remove it when
153+
// a new segment is added and this segment is not the tail one anymore.
154+
if (isTail) return
153155
while (true) {
154156
// Read `next` and `prev` pointers ignoring logically removed nodes.
155-
val prev = leftmostAliveNode
156-
val next = rightmostAliveNode
157+
val prev = aliveSegmentLeft
158+
val next = aliveSegmentRight
157159
// Link `next` and `prev`.
158160
next._prev.value = prev
159161
if (prev !== null) prev._next.value = next
@@ -165,17 +167,17 @@ internal abstract class ConcurrentLinkedListNode<N : ConcurrentLinkedListNode<N>
165167
}
166168
}
167169

168-
private val leftmostAliveNode: N? get() {
170+
private val aliveSegmentLeft: N? get() {
169171
var cur = prev
170172
while (cur !== null && cur.removed)
171173
cur = cur._prev.value
172174
return cur
173175
}
174176

175-
private val rightmostAliveNode: N get() {
177+
private val aliveSegmentRight: N get() {
176178
assert { !isTail } // Should not be invoked on the tail node
177179
var cur = next!!
178-
while (cur.removed)
180+
while (cur.removed && !cur.isTail)
179181
cur = cur.next!!
180182
return cur
181183
}
@@ -203,19 +205,20 @@ internal abstract class Segment<S : Segment<S>>(val id: Long, prev: S?, pointers
203205
* There are no pointers to this segment from outside, and
204206
* it is not a physical tail in the linked list of segments.
205207
*/
206-
override val removed get() = cleanedAndPointers.value == maxSlots && !isTail
208+
override val removed get() = cleanedAndPointers.value == maxSlots
207209

208210
// increments the number of pointers if this segment is not logically removed.
209-
internal fun tryIncPointers() = cleanedAndPointers.addConditionally(1 shl POINTERS_SHIFT) { it != maxSlots || isTail }
211+
internal fun tryIncPointers() = cleanedAndPointers.addConditionally(1 shl POINTERS_SHIFT) { it != maxSlots }
210212

211213
// returns `true` if this segment is logically removed after the decrement.
212-
internal fun decPointers() = cleanedAndPointers.addAndGet(-(1 shl POINTERS_SHIFT)) == maxSlots && !isTail
214+
internal fun decPointers() = cleanedAndPointers.addAndGet(-(1 shl POINTERS_SHIFT)) == maxSlots
213215

214216
/**
215217
* Invoked on each slot clean-up; should not be invoked twice for the same slot.
216218
*/
217219
fun onSlotCleaned() {
218-
if (cleanedAndPointers.incrementAndGet() == maxSlots && !isTail) remove()
220+
if (cleanedAndPointers.incrementAndGet() < maxSlots) return
221+
if (removed) remove()
219222
}
220223
}
221224

0 commit comments

Comments
 (0)