Skip to content

Commit 1770738

Browse files
committed
Initial implementation of JobSupport on top of ConcurrentLinkedList
1 parent 33fdfa8 commit 1770738

File tree

6 files changed

+224
-545
lines changed

6 files changed

+224
-545
lines changed

Diff for: kotlinx-coroutines-core/common/src/JobSupport.kt

+37-43
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import kotlinx.coroutines.internal.*
77
import kotlinx.coroutines.selects.*
88
import kotlin.coroutines.*
99
import kotlin.coroutines.intrinsics.*
10+
import kotlin.experimental.*
1011
import kotlin.js.*
1112
import kotlin.jvm.*
1213

@@ -319,8 +320,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
319320
private fun notifyCancelling(list: NodeList, cause: Throwable) {
320321
// first cancel our own children
321322
onCancelling(cause)
322-
list.close(LIST_CANCELLATION_PERMISSION)
323-
notifyHandlers(list, cause) { it.onCancelling }
323+
notifyHandlers(list, LIST_CANCELLATION_PERMISSION, cause) { it.onCancelling }
324324
// then cancel parent
325325
cancelParent(cause) // tentative cancellation -- does not matter if there is no parent
326326
}
@@ -352,13 +352,12 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
352352
}
353353

354354
private fun NodeList.notifyCompletion(cause: Throwable?) {
355-
close(LIST_ON_COMPLETION_PERMISSION)
356-
notifyHandlers(this, cause) { true }
355+
notifyHandlers(this, LIST_ON_COMPLETION_PERMISSION, cause) { true }
357356
}
358357

359-
private inline fun notifyHandlers(list: NodeList, cause: Throwable?, predicate: (JobNode) -> Boolean) {
358+
private fun notifyHandlers(list: NodeList, permissionBitmask: Byte, cause: Throwable?, predicate: (JobNode) -> Boolean) {
360359
var exception: Throwable? = null
361-
list.forEach { node ->
360+
list.forEach(forbidBitmask = permissionBitmask) { node ->
362361
if (node is JobNode && predicate(node)) {
363362
try {
364363
node.invoke(cause)
@@ -925,57 +924,52 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
925924
// process cancelling notification here -- it cancels all the children _before_ we start to wait them (sic!!!)
926925
notifyRootCause?.let { notifyCancelling(list, it) }
927926
// now wait for children
928-
val child = list.nextChild()
929-
if (child != null && tryWaitForChild(finishing, child, proposedUpdate))
930-
return COMPLETING_WAITING_CHILDREN
931-
list.close(LIST_CHILD_PERMISSION)
932-
val anotherChild = list.nextChild()
933-
if (anotherChild != null && tryWaitForChild(finishing, anotherChild, proposedUpdate))
934-
return COMPLETING_WAITING_CHILDREN
927+
if (shouldWaitForChildren(finishing, proposedUpdate)) return COMPLETING_WAITING_CHILDREN
935928
// otherwise -- we have not children left (all were already cancelled?)
936929
return finalizeFinishingState(finishing, proposedUpdate)
937930
}
938931

939932
private val Any?.exceptionOrNull: Throwable?
940933
get() = (this as? CompletedExceptionally)?.cause
941934

942-
// return false when there is no more incomplete children to wait
943-
// ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method.
944-
private tailrec fun tryWaitForChild(state: Finishing, child: ChildHandleNode, proposedUpdate: Any?): Boolean {
945-
val handle = child.childJob.invokeOnCompletion(
946-
invokeImmediately = false,
947-
handler = ChildCompletion(this, state, child, proposedUpdate)
948-
)
949-
if (handle !== NonDisposableHandle) return true // child is not complete and we've started waiting for it
950-
val nextChild = state.list.nextChild(startAfter = child) ?: return false
951-
return tryWaitForChild(state, nextChild, proposedUpdate)
935+
private fun shouldWaitForChildren(state: Finishing, proposedUpdate: Any?, suggestedStart: ChildHandleNode? = null): Boolean {
936+
val list = state.list
937+
fun tryFindChildren(suggestedStart: ChildHandleNode?, closeList: Boolean): Boolean {
938+
var startAfter: ChildHandleNode? = suggestedStart
939+
while (true) {
940+
val child = run {
941+
list.forEach(forbidBitmask = if (closeList) LIST_CHILD_PERMISSION else 0, startAfter = startAfter) {
942+
if (it is ChildHandleNode) return@run it
943+
}
944+
null
945+
} ?: break
946+
val handle = child.childJob.invokeOnCompletion(
947+
invokeImmediately = false,
948+
handler = ChildCompletion(this, state, child, proposedUpdate)
949+
)
950+
if (handle !== NonDisposableHandle) return true // child is not complete and we've started waiting for it
951+
startAfter = child
952+
}
953+
return false
954+
}
955+
// Look for children that are currently in the list after the suggested start node.
956+
if (tryFindChildren(suggestedStart = suggestedStart, closeList = false)) return true
957+
// We didn't find anyone in the list after the suggested start node. Let's check the beginning now.
958+
if (suggestedStart != null && tryFindChildren(suggestedStart = null, closeList = false)) return true
959+
// Now we know that, at the moment this function started, there were no more children.
960+
// We can close the list for the new children, and if we still don't find any, we can be sure there are none.
961+
return tryFindChildren(suggestedStart = null, closeList = true)
952962
}
953963

954964
// ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method.
955965
private fun continueCompleting(state: Finishing, lastChild: ChildHandleNode, proposedUpdate: Any?) {
956966
assert { this.state === state } // consistency check -- it cannot change while we are waiting for children
957-
// figure out if we need to wait for next child
958-
val waitChild = state.list.nextChild(startAfter = lastChild)
959-
// try wait for next child
960-
if (waitChild != null && tryWaitForChild(state, waitChild, proposedUpdate)) return // waiting for next child
961-
// no more children to wait -- stop accepting children
962-
state.list.close(LIST_CHILD_PERMISSION)
963-
// did any children get added?
964-
val waitChildAgain = state.list.nextChild(startAfter = lastChild)
965-
// try wait for next child
966-
if (waitChildAgain != null && tryWaitForChild(state, waitChildAgain, proposedUpdate)) return // waiting for next child
967+
if (shouldWaitForChildren(state, proposedUpdate, suggestedStart = lastChild)) return // waiting for the next child
967968
// no more children, now we are sure; try to update the state
968969
val finalState = finalizeFinishingState(state, proposedUpdate)
969970
afterCompletion(finalState)
970971
}
971972

972-
private fun NodeList.nextChild(startAfter: LockFreeLinkedListNode? = null): ChildHandleNode? {
973-
forEach(startAfter) {
974-
if (it is ChildHandleNode) return it
975-
}
976-
return null
977-
}
978-
979973
public final override val children: Sequence<Job> get() = sequence {
980974
when (val state = this@JobSupport.state) {
981975
is ChildHandleNode -> yield(state.childJob)
@@ -1389,9 +1383,9 @@ private val EMPTY_NEW = Empty(false)
13891383
private val EMPTY_ACTIVE = Empty(true)
13901384

13911385
// bit mask
1392-
private const val LIST_ON_COMPLETION_PERMISSION = 1
1393-
private const val LIST_CHILD_PERMISSION = 2
1394-
private const val LIST_CANCELLATION_PERMISSION = 4
1386+
private const val LIST_ON_COMPLETION_PERMISSION = 1.toByte()
1387+
private const val LIST_CHILD_PERMISSION = 2.toByte()
1388+
private const val LIST_CANCELLATION_PERMISSION = 4.toByte()
13951389

13961390
private class Empty(override val isActive: Boolean) : Incomplete {
13971391
override val list: NodeList? get() = null

Diff for: kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt

+187-11
Original file line numberDiff line numberDiff line change
@@ -2,40 +2,216 @@
22

33
package kotlinx.coroutines.internal
44

5+
import kotlinx.atomicfu.*
6+
import kotlinx.coroutines.*
7+
import kotlin.coroutines.*
8+
import kotlin.experimental.*
9+
import kotlin.jvm.*
10+
511
/** @suppress **This is unstable API and it is subject to change.** */
6-
public expect open class LockFreeLinkedListNode() {
12+
internal open class LockFreeLinkedListNode {
713
/**
8-
* Try putting `node` into a list.
14+
* Try putting this node into a list.
915
*
1016
* Returns:
1117
* - The new head of the list if the operation succeeded.
1218
* - The head of the list if someone else concurrently added this node to the list,
1319
* but no other modifications to the list were made.
14-
* - Some garbage if the list was already edited.
1520
*/
16-
public fun attachToList(node: LockFreeLinkedListHead): LockFreeLinkedListNode
21+
fun attachToList(head: LockFreeLinkedListHead): LockFreeLinkedListHead {
22+
val newAddress = head.addLastWithoutModifying(this, permissionsBitmask = 0)
23+
assert { newAddress != null }
24+
return if (_address.compareAndSet(null, newAddress)) {
25+
head
26+
} else {
27+
_address.value!!.segment.head
28+
}
29+
}
1730

1831
/**
1932
* Remove this node from the list.
2033
*/
21-
public open fun remove()
34+
open fun remove() {
35+
_address.value?.let {
36+
val segment = it.segment
37+
segment.clearSlot(it.index)
38+
}
39+
}
40+
41+
private val _address = atomic<Address?>(null)
42+
43+
val address: Address get() = _address.value!!
44+
45+
internal fun trySetAddress(address: Address) = this._address.compareAndSet(null, address)
2246
}
2347

2448
/** @suppress **This is unstable API and it is subject to change.** */
25-
public expect open class LockFreeLinkedListHead() {
49+
internal open class LockFreeLinkedListHead {
50+
private val head = LockFreeLinkedListSegment(
51+
id = 0,
52+
prev = null,
53+
pointers = 2,
54+
head = this,
55+
)
56+
private val tail = atomic(head)
57+
private val nextElement = atomic(0L)
58+
59+
/**
60+
* The list of bits that are forbidden from entering the list.
61+
*
62+
* TODO: we can store this in the extra bits in [head], there's enough space for that there, and it's never removed.
63+
*/
64+
private val forbiddenBits: AtomicInt = atomic(0)
65+
2666
/**
2767
* Iterates over all non-removed elements in this list, skipping every node until (and including) [startAfter].
2868
*/
29-
public inline fun forEach(startAfter: LockFreeLinkedListNode? = null, block: (LockFreeLinkedListNode) -> Unit)
69+
inline fun forEach(
70+
forbidBitmask: Byte = 0,
71+
startAfter: LockFreeLinkedListNode? = null,
72+
block: (LockFreeLinkedListNode) -> Unit
73+
) {
74+
forbiddenBits.update { it or forbidBitmask.toInt() }
75+
val startAddress = startAfter?.address
76+
var segment: LockFreeLinkedListSegment? = startAddress?.segment ?: head
77+
var startIndex: Int = startAddress?.index?.let { it + 1 } ?: 0
78+
while (segment != null) {
79+
segment.forEach(forbidBitmask = forbidBitmask, startIndex = startIndex, block = block)
80+
segment = segment.next
81+
startIndex = 0
82+
}
83+
}
3084

3185
/**
32-
* Closes the list for anything that requests the permission [forbiddenElementsBit].
33-
* Only a single permission can be forbidden at a time, but this isn't checked.
86+
* Adds the [node] to the end of the list if every bit in [permissionsBitmask] is still allowed in the list,
87+
* and then sets the [node]'s address to the new address.
3488
*/
35-
public fun close(forbiddenElementsBit: Int)
89+
fun addLast(node: LockFreeLinkedListNode, permissionsBitmask: Byte): Boolean {
90+
val address = addLastWithoutModifying(node, permissionsBitmask) ?: return false
91+
val success = node.trySetAddress(address)
92+
assert { success }
93+
return true
94+
}
3695

3796
/**
3897
* Adds the [node] to the end of the list if every bit in [permissionsBitmask] is still allowed in the list.
98+
* As opposed to [addLast], doesn't modify the [node]'s address.
3999
*/
40-
public fun addLast(node: LockFreeLinkedListNode, permissionsBitmask: Int): Boolean
100+
fun addLastWithoutModifying(node: LockFreeLinkedListNode, permissionsBitmask: Byte): Address? {
101+
/** First, avoid modifying the list at all if it was already closed for elements like ours. */
102+
if (permissionsBitmask and forbiddenBits.value.toByte() != 0.toByte()) return null
103+
/** Obtain the place from which the desired segment will certainly be reachable. */
104+
val curTail = tail.value
105+
/** Allocate a place for our element. */
106+
val index = nextElement.getAndIncrement()
107+
/** Find or create a segment where the node can be stored. */
108+
val createNewSegment = ::createSegment // can't just pass the function, as the compiler crashes (KT-67332)
109+
val segmentId = index / SEGMENT_SIZE
110+
val segment = tail.findSegmentAndMoveForward(id = segmentId, curTail, createNewSegment).segment
111+
assert { segment.id == segmentId }
112+
val indexInSegment = (index % SEGMENT_SIZE).toInt()
113+
/** Double-check that it's still not forbidden for the node to enter the list. */
114+
if (permissionsBitmask and forbiddenBits.value.toByte() != 0.toByte()) return null
115+
/** Now we know that the list was still not closed at some point *even after the segment* was created.
116+
* Because [forbiddenBits] is set before [forEach] traverses the list, this means that [forEach] is guaranteed
117+
* to observe the new segment and either break the cell where [node] wants to arrive or process the [node].
118+
* In any case, we have linearizable behavior. */
119+
return if (segment.tryAdd(node, permissionsBitmask = permissionsBitmask, indexInSegment = indexInSegment)) {
120+
Address(segment, indexInSegment)
121+
} else {
122+
null
123+
}
124+
}
41125
}
126+
127+
internal open class LockFreeLinkedListSegment(
128+
id: Long,
129+
prev: LockFreeLinkedListSegment?,
130+
pointers: Int,
131+
/** Used only during promoting of a single node to a list to ensure wait-freedom of the promotion operation.
132+
* Without this, promotion can't be implemented without a (possibly bounded) spin loop: once the node is committed
133+
* to be part of some list, the other threads can't do anything until that one thread sets the state to be the
134+
* head of the list. */
135+
@JvmField val head: LockFreeLinkedListHead,
136+
) : Segment<LockFreeLinkedListSegment>(id = id, prev = prev, pointers = pointers)
137+
{
138+
/** Each cell is a [LockFreeLinkedListNode], a [BrokenForSomeElements], or `null`. */
139+
private val cells = atomicArrayOfNulls<Any>(SEGMENT_SIZE)
140+
141+
override val numberOfSlots: Int get() = SEGMENT_SIZE
142+
143+
fun clearSlot(index: Int) {
144+
cells[index].value = null
145+
onSlotCleaned()
146+
}
147+
148+
inline fun forEach(forbidBitmask: Byte, startIndex: Int, block: (LockFreeLinkedListNode) -> Unit) {
149+
for (i in startIndex until SEGMENT_SIZE) {
150+
val node = breakCellOrGetValue(forbidBitmask, i)
151+
if (node != null) block(node)
152+
}
153+
}
154+
155+
private fun breakCellOrGetValue(forbidBitmask: Byte, index: Int): LockFreeLinkedListNode? {
156+
while (true) {
157+
val value = cells[index].value
158+
if (value is BrokenForSomeElements?) {
159+
val newForbiddenBits = value.forbiddenBits or forbidBitmask
160+
if (newForbiddenBits == value.forbiddenBits
161+
|| cells[index].compareAndSet(value, BrokenForSomeElements.fromBitmask(newForbiddenBits)))
162+
return null
163+
} else {
164+
return value as LockFreeLinkedListNode
165+
}
166+
}
167+
}
168+
169+
/**
170+
* Adds the [node] to the array of cells if the slot wasn't broken.
171+
*/
172+
fun tryAdd(node: LockFreeLinkedListNode, permissionsBitmask: Byte, indexInSegment: Int): Boolean {
173+
if (cells[indexInSegment].compareAndSet(null, node)) return true
174+
cells[indexInSegment].loop { value ->
175+
// This means that some elements are forbidden from entering the list.
176+
value as BrokenForSomeElements
177+
// Are *we* forbidden from entering the list?
178+
if (value.forbiddenBits and permissionsBitmask != 0.toByte()) {
179+
cells[indexInSegment].value = BrokenForSomeElements.FULLY_BROKEN
180+
onSlotCleaned()
181+
return false
182+
}
183+
// We aren't forbidden. Let's try entering it.
184+
if (cells[indexInSegment].compareAndSet(value, node)) return true
185+
}
186+
}
187+
188+
override fun onCancellation(index: Int, cause: Throwable?, context: CoroutineContext) {
189+
throw UnsupportedOperationException("Cancellation is not supported on LockFreeLinkedList")
190+
}
191+
}
192+
193+
internal class Address(@JvmField val segment: LockFreeLinkedListSegment, @JvmField val index: Int)
194+
195+
private fun createSegment(id: Long, prev: LockFreeLinkedListSegment): LockFreeLinkedListSegment =
196+
LockFreeLinkedListSegment(
197+
id = id,
198+
prev = prev,
199+
pointers = 0,
200+
head = prev.head
201+
)
202+
203+
private const val SEGMENT_SIZE = 8
204+
205+
@JvmInline
206+
private value class BrokenForSomeElements private constructor(val forbiddenBits: Byte) {
207+
companion object {
208+
fun fromBitmask(forbiddenBits: Byte): BrokenForSomeElements? = when (forbiddenBits) {
209+
0.toByte() -> null // no one is forbidden
210+
else -> BrokenForSomeElements(forbiddenBits)
211+
}
212+
213+
val FULLY_BROKEN = BrokenForSomeElements(255.toByte())
214+
}
215+
}
216+
217+
private val BrokenForSomeElements?.forbiddenBits get() = this?.forbiddenBits ?: 0

0 commit comments

Comments
 (0)