Skip to content

Commit 51dc99b

Browse files
committed
Introduce SegmentQueueSynchronizer abstraction for synchronization primitives
1 parent 3744f8e commit 51dc99b

File tree

5 files changed

+1207
-176
lines changed

5 files changed

+1207
-176
lines changed

Diff for: kotlinx-coroutines-core/build.gradle

+2-2
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,8 @@ task jvmStressTest(type: Test, dependsOn: compileTestKotlinJvm) {
119119
enableAssertions = true
120120
testLogging.showStandardStreams = true
121121
systemProperty 'kotlinx.coroutines.scheduler.keep.alive.sec', '100000' // any unpark problem hangs test
122-
systemProperty 'kotlinx.coroutines.semaphore.segmentSize', '2'
123-
systemProperty 'kotlinx.coroutines.semaphore.maxSpinCycles', '10'
122+
systemProperty 'kotlinx.coroutines.sqs.segmentSize', '2'
123+
systemProperty 'kotlinx.coroutines.sqs.maxSpinCycles', '10'
124124
}
125125

126126
task jdk16Test(type: Test, dependsOn: [compileTestKotlinJvm, checkJdk16]) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,359 @@
1+
/*
2+
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.internal
6+
7+
import kotlinx.atomicfu.*
8+
import kotlinx.coroutines.*
9+
import kotlinx.coroutines.internal.SegmentQueueSynchronizer.ResumeMode.ASYNC
10+
import kotlinx.coroutines.internal.SegmentQueueSynchronizer.ResumeMode.SYNC
11+
import kotlinx.coroutines.internal.SegmentQueueSynchronizer.CancellationMode.*
12+
import kotlinx.coroutines.sync.*
13+
import kotlin.coroutines.*
14+
import kotlin.native.concurrent.*
15+
16+
/**
17+
* [SegmentQueueSynchronizer] is an abstraction for implementing _fair_ synchronization
18+
* and communication primitives that maintains a FIFO queue of waiting requests.
19+
* The two main functions it provides:
20+
* + [suspend] that stores the specified waiter into the queue, and
21+
* + [resume] function that tries to retrieve and resume the first waiter with the specified value.
22+
*
23+
* One may consider the structure as an infinite array with two counters that reference the next cells
24+
* for enqueueing a continuation in [suspend] and for retrieving one in [tryResume]. To be short, when
25+
* [suspend] is invoked, it increments the corresponding counter via fast `Fetch-And-Add` and stores the
26+
* continuation into the cell. At the same time, [tryResume] increments its own counter and comes to the
27+
* corresponding cell.
28+
*
29+
* Since [suspend] can store [CancellableContinuation]-s, it is possible for [tryResume] to fail if the
30+
* continuation is already cancelled. In this case, most of the algorithms retry the whole operation.
31+
* However, some solutions may invoke [tryResume] until it succeeds, so that [SegmentQueueSynchronizer]
32+
* is provided with a nice short-cut [resume], which also efficiently skips consecutive cancelled continuations.
33+
*
34+
* The typical implementations via [SegmentQueueSynchronizer] perform some synchronization at first,
35+
* (e.g., [Semaphore] modifies the number of available permits), and invoke [suspend] or [tryResume]
36+
* after that. Following this pattern, it is possible in a concurrent environment that [tryResume]
37+
* is executed before [suspend] (similarly to the race between `park` and `unpark` for threads),
38+
* so that [tryResume] comes to an empty cell. This race can be solved with two [strategies][Mode]:
39+
* [asynchronous][Mode.ASYNC] and [synchronous][Mode.SYNC].
40+
* In the [synchronous][Mode.ASYNC] mode, [tryResume] puts the element if the cell is empty
41+
* and finishes, the next [suspend] comes to this cell and simply grabs the element without suspension.
42+
* At the same time, in the [synchronous][Mode.SYNC] mode, [tryResume] waits in a bounded spin-loop
43+
* until the put element is taken, marking the cell as broken if it is not after all. In this case both
44+
* the current [tryResume] and the [suspend] that comes to this broken cell fail.
45+
*
46+
* Here is a state machine for cells. Note that only one [suspend] and at most one [tryResume] (or [resume]) operation
47+
* can deal with each cell.
48+
*
49+
* +------+ `suspend` succeeds +------+ `tryResume` tries +------+ // if `cont.tryResume(..)` succeeds, then
50+
* | NULL | -------------------> | cont | -------------------> | DONE | (`cont` IS RETRIEVED) // the corresponding `tryResume` succeeds gets
51+
* +------+ +------+ to resume `cont` +------+ // as well, fails otherwise.
52+
* | |
53+
* | | The suspended request is cancelled and the continuation is
54+
* | `tryResume` comes | replaced with a special `CANCELLED` token to avoid memory leaks.
55+
* | to the cell before V
56+
* | `suspend` and puts +-----------+
57+
* | the element into | CANCELLED | (`cont` IS CANCELLED, `tryResume` FAILS)
58+
* | the cell, waiting +-----------+
59+
* | for `suspend` in
60+
* | ASYNC mode.
61+
* |
62+
* | `suspend` gets +-------+ ( ELIMINATION HAPPENED, )
63+
* | +-----------------> | TAKEN | ( BOTH `tryResume` and )
64+
* V | the element +-------+ ( `suspend` SUCCEED )
65+
* +---------+ |
66+
* | element | --<
67+
* +---------+ |
68+
* |
69+
* | `tryResume` has waited a bounded time +--------+
70+
* +---------------------------------------> | BROKEN | (BOTH `suspend` AND `tryResume` FAIL)
71+
* but `suspend` has not come +--------+
72+
*
73+
* As for the infinite array implementation, it is organized as a linked list of [segments][SQSSegment];
74+
* each segment contains a fixed number of cells. To determine the cell for each [suspend] and [tryResume]
75+
* operation, the algorithm reads the current [tail] or [head], increments [enqIdx] or [deqIdx], and
76+
* finds the required segment starting from the initially read one.
77+
*/
78+
@InternalCoroutinesApi
79+
internal abstract class SegmentQueueSynchronizer<T : Any> {
80+
private val head: AtomicRef<SQSSegment>
81+
private val deqIdx = atomic(0L)
82+
private val tail: AtomicRef<SQSSegment>
83+
private val enqIdx = atomic(0L)
84+
85+
init {
86+
val s = SQSSegment(0, null, 2)
87+
head = atomic(s)
88+
tail = atomic(s)
89+
}
90+
91+
/**
92+
* Specifies whether [resume] should work in
93+
* [synchronous][SYNC] or [asynchronous][ASYNC] mode.
94+
*/
95+
protected open val resumeMode: ResumeMode get() = SYNC
96+
97+
/**
98+
* Specifies whether [resume] should fail on cancelled waiters ([SIMPLE]),
99+
* or skip them in either [synchronous][SMART_SYNC] or [asynchronous][SMART_ASYNC]
100+
* way; in the asynchronous skip mode [resume] may pass the element to the
101+
* cancellation handler in order not to wait, so that the element can be "hung" for a while.
102+
* Specifies whether [resume] should skip cancelled waiters (`true`)
103+
* or fail in this case (`false`). By default, [resume] fails if
104+
* comes to a cancelled waiter.
105+
*/
106+
protected open val cancellationMode: CancellationMode get() = SIMPLE
107+
108+
/**
109+
* This function is invoked when waiter is cancelled. It returns
110+
* `true` if the cancellation succeeds and no additional synchronization
111+
* is required, so that the corresponding cell can be marked as [CANCELLED].
112+
* However, if a concurrent [resume] is already going to resume this waiter,
113+
* [onCancellation] returns `false` so that the element
114+
* and `false` if the element should be resumed blah-
115+
*
116+
* true = the the element can stay in the queue
117+
*/
118+
open fun onCancellation() : Boolean = false
119+
120+
open fun tryReturnRefusedValue(value: T): Boolean = true
121+
122+
open fun returnValue(value: T) {}
123+
124+
private fun returnRefusedValue(value: T) {
125+
if (tryReturnRefusedValue(value)) return
126+
returnValue(value)
127+
}
128+
129+
/**
130+
* TODO
131+
* Returns `false` if the received permit cannot be used and the calling operation should restart.
132+
*/
133+
@Suppress("UNCHECKED_CAST")
134+
fun suspend(cont: Continuation<T>): Boolean {
135+
val curTail = this.tail.value
136+
val enqIdx = enqIdx.getAndIncrement()
137+
val segment = this.tail.findSegmentAndMoveForward(id = enqIdx / SEGMENT_SIZE, startFrom = curTail,
138+
createNewSegment = ::createSegment).segment // cannot be closed
139+
val i = (enqIdx % SEGMENT_SIZE).toInt()
140+
// the regular (fast) path -- if the cell is empty, try to install continuation
141+
if (segment.cas(i, null, cont)) { // try to install the continuation
142+
if (cont is CancellableContinuation<*>) {
143+
cont.invokeOnCancellation(SQSCancellationHandler(segment, i).asHandler)
144+
}
145+
return true
146+
}
147+
// On CAS failure -- the cell must either contain a value or be broken.
148+
// Try to grab the value.
149+
val value = segment.get(i)
150+
if (value !== BROKEN && segment.cas(i, value, TAKEN)) { // took the value eliminating suspend/tryResume pair
151+
cont.resume(value as T)
152+
return true
153+
}
154+
assert { segment.get(i) === BROKEN } // it must be broken in this case, no other way around it
155+
return false // broken cell, need to retry on a different cell
156+
}
157+
158+
/**
159+
* Essentially, this is a short-cut for `while (!tryResume(..)) {}`, but
160+
* works in O(1) without contention independently on how many
161+
* [suspended][suspend] continuations has been cancelled.
162+
*/
163+
fun resume(value: T): Boolean {
164+
val skipCancelled = cancellationMode != SIMPLE
165+
while (true) {
166+
when (tryResumeImpl(value, adjustDeqIdx = skipCancelled)) {
167+
TRY_RESUME_SUCCESS -> return true
168+
TRY_RESUME_FAIL_CANCELLED -> if (!skipCancelled) return false
169+
TRY_RESUME_FAIL_BROKEN -> return false
170+
}
171+
}
172+
}
173+
174+
@Suppress("UNCHECKED_CAST")
175+
private fun tryResumeImpl(value: T, adjustDeqIdx: Boolean): Int {
176+
val curHead = this.head.value
177+
val deqIdx = deqIdx.getAndIncrement()
178+
val id = deqIdx / SEGMENT_SIZE
179+
val segment = this.head.findSegmentAndMoveForward(id, startFrom = curHead,
180+
createNewSegment = ::createSegment).segment // cannot be closed
181+
segment.cleanPrev()
182+
if (segment.id > id) {
183+
if (adjustDeqIdx) adjustDeqIdx(segment.id * SEGMENT_SIZE)
184+
return TRY_RESUME_FAIL_CANCELLED
185+
}
186+
val i = (deqIdx % SEGMENT_SIZE).toInt()
187+
modify_cell@while (true) { // modify the cell state
188+
val cellState = segment.get(i)
189+
when {
190+
cellState === null -> {
191+
if (!segment.cas(i, null, value)) continue@modify_cell
192+
// Return immediately in the async mode
193+
if (resumeMode == ASYNC) return TRY_RESUME_SUCCESS
194+
// Acquire has not touched this cell yet, wait until it comes for a bounded time
195+
// The cell state can only transition from PERMIT to TAKEN by addAcquireToQueue
196+
repeat(MAX_SPIN_CYCLES) {
197+
if (segment.get(i) === TAKEN) return TRY_RESUME_SUCCESS
198+
}
199+
// Try to break the slot in order not to wait
200+
return if (segment.cas(i, value, BROKEN)) TRY_RESUME_FAIL_BROKEN else TRY_RESUME_SUCCESS
201+
}
202+
cellState === CANCELLED -> {
203+
return TRY_RESUME_FAIL_CANCELLED
204+
} // the acquire was already cancelled
205+
cellState === REFUSE -> {
206+
returnRefusedValue(value)
207+
return TRY_RESUME_SUCCESS
208+
}
209+
cellState is CancellableContinuation<*> -> {
210+
val success = (cellState as CancellableContinuation<T>).tryResume0(value)
211+
if (success) {
212+
segment.set(i, DONE)
213+
return TRY_RESUME_SUCCESS
214+
} else {
215+
when (cancellationMode) {
216+
SIMPLE -> return TRY_RESUME_FAIL_CANCELLED
217+
SMART_SYNC -> continue@modify_cell
218+
SMART_ASYNC -> {
219+
// Let the cancellation handler decide what to do with the element :)
220+
val valueToStore: Any? = if (value is Continuation<*>) WrappedContinuationValue(value) else value
221+
if (segment.cas(i, cellState, valueToStore)) return TRY_RESUME_SUCCESS
222+
}
223+
}
224+
}
225+
}
226+
else -> {
227+
(cellState as Continuation<T>).resume(value)
228+
segment.set(i, DONE)
229+
return TRY_RESUME_SUCCESS
230+
}
231+
}
232+
}
233+
}
234+
235+
private fun adjustDeqIdx(newValue: Long): Unit = deqIdx.loop { cur ->
236+
if (cur >= newValue) return
237+
if (deqIdx.compareAndSet(cur, newValue)) return
238+
}
239+
240+
/**
241+
* These modes define the strategy that [tryResume] and [resume] should
242+
* use when they come to the cell before [suspend] and find it empty.
243+
* In the [asynchronous][ASYNC] mode, they put the value into the slot,
244+
* so that [suspend] grabs it and immediately resumes. However,
245+
* this strategy produces an incorrect behavior when used for some
246+
* data structures (e.g., for [Semaphore]), and the [synchronous][SYNC]
247+
* mode is used in this case. Similarly to the asynchronous mode,
248+
* [tryResume] and [resume] put the value into the cell, but do not finish
249+
* after that. In opposite, they wait in a bounded spin-loop
250+
* (see [MAX_SPIN_CYCLES]) until the value is taken, marking the cell
251+
* as [broken][BROKEN] and failing if it is not, so that the corresponding
252+
* [suspend] invocation finds the cell [broken][BROKEN] and fails as well.
253+
*/
254+
internal enum class ResumeMode { SYNC, ASYNC }
255+
256+
internal enum class CancellationMode { SIMPLE, SMART_SYNC, SMART_ASYNC }
257+
258+
private inner class SQSCancellationHandler(
259+
private val segment: SQSSegment,
260+
private val index: Int
261+
) : CancelHandler() {
262+
override fun invoke(cause: Throwable?) {
263+
if (cancellationMode === SIMPLE) {
264+
segment.markCancelled(index)
265+
return
266+
}
267+
val cancelled = onCancellation()
268+
if (cancelled) {
269+
val value = segment.markCancelled(index) ?: return
270+
if (resume(value as T)) return
271+
returnValue(value)
272+
} else {
273+
val value = segment.markRefused(index) ?: return
274+
returnRefusedValue(value as T)
275+
}
276+
}
277+
278+
override fun toString() = "SQSCancellationHandler[$segment, $index]"
279+
}
280+
}
281+
282+
private fun <T> CancellableContinuation<T>.tryResume0(value: T): Boolean {
283+
val token = tryResume(value) ?: return false
284+
completeResume(token)
285+
return true
286+
}
287+
288+
private fun createSegment(id: Long, prev: SQSSegment?) = SQSSegment(id, prev, 0)
289+
290+
private class SQSSegment(id: Long, prev: SQSSegment?, pointers: Int) : Segment<SQSSegment>(id, prev, pointers) {
291+
val waiters = atomicArrayOfNulls<Any?>(SEGMENT_SIZE)
292+
override val maxSlots: Int get() = SEGMENT_SIZE
293+
294+
@Suppress("NOTHING_TO_INLINE")
295+
inline fun get(index: Int): Any? = waiters[index].value
296+
297+
@Suppress("NOTHING_TO_INLINE")
298+
inline fun set(index: Int, value: Any?) {
299+
waiters[index].value = value
300+
}
301+
302+
@Suppress("NOTHING_TO_INLINE")
303+
inline fun cas(index: Int, expected: Any?, value: Any?): Boolean = waiters[index].compareAndSet(expected, value)
304+
305+
@Suppress("NOTHING_TO_INLINE")
306+
inline fun getAndSet(index: Int, value: Any?): Any? = waiters[index].getAndSet(value)
307+
308+
// Cleans the acquirer slot located by the specified index
309+
// and removes this segment physically if all slots are cleaned.
310+
fun markCancelled(index: Int): Any? = mark(index, CANCELLED).also { res ->
311+
if (res == null) onSlotCleaned()
312+
}
313+
314+
fun markRefused(index: Int): Any? = mark(index, REFUSE)
315+
316+
private fun mark(index: Int, marker: Any?): Any? =
317+
when (val old = getAndSet(index, marker)) {
318+
is Continuation<*> -> {
319+
assert { if (old is CancellableContinuation<*>) old.isCancelled else true }
320+
null
321+
}
322+
is WrappedContinuationValue -> old.cont
323+
else -> old
324+
}
325+
326+
override fun toString() = "SQSSegment[id=$id, hashCode=${hashCode()}]"
327+
}
328+
329+
/**
330+
* In the [smart asynchronous cancellation mode][SegmentQueueSynchronizer.CancellationMode.SMART_ASYNC]
331+
* it is possible that [resume] comes to the cell with cancelled continuation and
332+
* asynchronously puts its value into the cell, so that the cancellation handler decides whether
333+
* this value should be used for resuming the next waiter or be refused. When this
334+
* value is a continuation, it is hard to distinguish it with the one related to the cancelled
335+
* waiter. Thus, such values are wrapped with [WrappedContinuationValue] in this case. Note, that the
336+
* wrapper is required only in [SegmentQueueSynchronizer.CancellationMode.SMART_ASYNC] mode
337+
* and is used in the asynchronous race resolution logic between cancellation and [resume]
338+
* invocation; this way, it is used relatively rare.
339+
*/
340+
private class WrappedContinuationValue(val cont: Continuation<*>)
341+
342+
@SharedImmutable
343+
private val SEGMENT_SIZE = systemProp("kotlinx.coroutines.sqs.segmentSize", 16)
344+
@SharedImmutable
345+
private val MAX_SPIN_CYCLES = systemProp("kotlinx.coroutines.sqs.maxSpinCycles", 100)
346+
@SharedImmutable
347+
private val TAKEN = Symbol("TAKEN")
348+
@SharedImmutable
349+
private val BROKEN = Symbol("BROKEN")
350+
@SharedImmutable
351+
private val CANCELLED = Symbol("CANCELLED")
352+
@SharedImmutable
353+
private val REFUSE = Symbol("REFUSE")
354+
@SharedImmutable
355+
private val DONE = Symbol("DONE")
356+
357+
private const val TRY_RESUME_SUCCESS = 0
358+
private const val TRY_RESUME_FAIL_CANCELLED = 1
359+
private const val TRY_RESUME_FAIL_BROKEN = 2

0 commit comments

Comments
 (0)