Skip to content

Commit 70c9926

Browse files
committed
Reduce the memory consumption slightly
In exchange, now, removal is linear in the size of number of registered handlers instead of being amortized constant.
1 parent 0277752 commit 70c9926

File tree

3 files changed

+65
-74
lines changed

3 files changed

+65
-74
lines changed

Diff for: kotlinx-coroutines-core/common/src/JobSupport.kt

+36-19
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
357357

358358
private fun notifyHandlers(list: NodeList, permissionBitmask: Byte, cause: Throwable?, predicate: (JobNode) -> Boolean) {
359359
var exception: Throwable? = null
360-
list.forEach(forbidBitmask = permissionBitmask) { node ->
360+
list.forEach(forbidBitmask = permissionBitmask) { node, _, _ ->
361361
if (node is JobNode && predicate(node)) {
362362
try {
363363
node.invoke(cause)
@@ -558,7 +558,10 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
558558

559559
private fun promoteSingleToNodeList(state: JobNode) {
560560
// try to promote it to list (SINGLE+ state)
561-
_state.compareAndSet(state, state.attachToList(NodeList()))
561+
val list = NodeList()
562+
val address = list.addLastWithoutModifying(state, permissionsBitmask = 0)
563+
assert { address == 0L }
564+
_state.compareAndSet(state, list)
562565
}
563566

564567
public final override suspend fun join() {
@@ -621,7 +624,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
621624
}
622625
is Incomplete -> { // may have a list of completion handlers
623626
// remove node from the list if there is a list
624-
if (state.list != null) node.remove()
627+
state.list?.remove(node)
625628
return
626629
}
627630
else -> return // it is complete and does not have any completion handlers
@@ -932,39 +935,52 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
932935
private val Any?.exceptionOrNull: Throwable?
933936
get() = (this as? CompletedExceptionally)?.cause
934937

935-
private fun shouldWaitForChildren(state: Finishing, proposedUpdate: Any?, suggestedStart: ChildHandleNode? = null): Boolean {
938+
private fun shouldWaitForChildren(
939+
state: Finishing,
940+
proposedUpdate: Any?,
941+
suggestedStartSegment: LockFreeLinkedListSegment? = null,
942+
suggestedStartIndex: Int? = null
943+
): Boolean {
936944
val list = state.list
937-
fun tryFindChildren(suggestedStart: ChildHandleNode?, closeList: Boolean): Boolean {
938-
var startAfter: ChildHandleNode? = suggestedStart
945+
fun tryFindChildren(
946+
closeList: Boolean,
947+
suggestedStartSegment: LockFreeLinkedListSegment? = null,
948+
suggestedStartIndex: Int? = null,
949+
): Boolean {
950+
var startSegment = suggestedStartSegment
951+
var startIndex = suggestedStartIndex
939952
while (true) {
940953
val child = run {
941-
list.forEach(forbidBitmask = if (closeList) LIST_CHILD_PERMISSION else 0, startAfter = startAfter) {
942-
if (it is ChildHandleNode) return@run it
954+
list.forEach(forbidBitmask = if (closeList) LIST_CHILD_PERMISSION else 0, startInSegment = startSegment, startAfterIndex = startIndex) { node, segment, indexInSegment ->
955+
if (node is ChildHandleNode) {
956+
startSegment = segment
957+
startIndex = indexInSegment
958+
return@run node
959+
}
943960
}
944961
null
945962
} ?: break
946963
val handle = child.childJob.invokeOnCompletion(
947964
invokeImmediately = false,
948-
handler = ChildCompletion(this, state, child, proposedUpdate)
965+
handler = ChildCompletion(this, state, startSegment!!, startIndex!!, proposedUpdate)
949966
)
950967
if (handle !== NonDisposableHandle) return true // child is not complete and we've started waiting for it
951-
startAfter = child
952968
}
953969
return false
954970
}
955971
// Look for children that are currently in the list after the suggested start node.
956-
if (tryFindChildren(suggestedStart = suggestedStart, closeList = false)) return true
972+
if (tryFindChildren(suggestedStartSegment = suggestedStartSegment, suggestedStartIndex = suggestedStartIndex, closeList = false)) return true
957973
// We didn't find anyone in the list after the suggested start node. Let's check the beginning now.
958-
if (suggestedStart != null && tryFindChildren(suggestedStart = null, closeList = false)) return true
974+
if (suggestedStartSegment != null && tryFindChildren(closeList = false)) return true
959975
// Now we know that, at the moment this function started, there were no more children.
960976
// We can close the list for the new children, and if we still don't find any, we can be sure there are none.
961-
return tryFindChildren(suggestedStart = null, closeList = true)
977+
return tryFindChildren(closeList = true)
962978
}
963979

964980
// ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method.
965-
private fun continueCompleting(state: Finishing, lastChild: ChildHandleNode, proposedUpdate: Any?) {
981+
private fun continueCompleting(state: Finishing, proposedUpdate: Any?, lastSegment: LockFreeLinkedListSegment, lastIndexInSegment: Int) {
966982
assert { this.state === state } // consistency check -- it cannot change while we are waiting for children
967-
if (shouldWaitForChildren(state, proposedUpdate, suggestedStart = lastChild)) return // waiting for the next child
983+
if (shouldWaitForChildren(state, proposedUpdate, suggestedStartSegment = lastSegment, suggestedStartIndex = lastIndexInSegment)) return // waiting for the next child
968984
// no more children, now we are sure; try to update the state
969985
val finalState = finalizeFinishingState(state, proposedUpdate)
970986
afterCompletion(finalState)
@@ -974,7 +990,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
974990
when (val state = this@JobSupport.state) {
975991
is ChildHandleNode -> yield(state.childJob)
976992
is Incomplete -> state.list?.let { list ->
977-
list.forEach { if (it is ChildHandleNode) yield(it.childJob) }
993+
list.forEach { it, _, _ -> if (it is ChildHandleNode) yield(it.childJob) }
978994
}
979995
}
980996
}
@@ -1232,11 +1248,12 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
12321248
private class ChildCompletion(
12331249
private val parent: JobSupport,
12341250
private val state: Finishing,
1235-
private val child: ChildHandleNode,
1251+
private val segment: LockFreeLinkedListSegment,
1252+
private val indexInSegment: Int,
12361253
private val proposedUpdate: Any?
12371254
) : JobNode() {
12381255
override fun invoke(cause: Throwable?) {
1239-
parent.continueCompleting(state, child, proposedUpdate)
1256+
parent.continueCompleting(state, proposedUpdate, lastSegment = segment, lastIndexInSegment = indexInSegment)
12401257
}
12411258
override val onCancelling: Boolean get() = false
12421259
}
@@ -1477,7 +1494,7 @@ internal class NodeList : LockFreeLinkedListHead(), Incomplete {
14771494
append(state)
14781495
append("}[")
14791496
var first = true
1480-
this@NodeList.forEach { node ->
1497+
this@NodeList.forEach { node, _, _ ->
14811498
if (node is JobNode) {
14821499
if (first) first = false else append(", ")
14831500
append(node)

Diff for: kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt

+25-51
Original file line numberDiff line numberDiff line change
@@ -11,38 +11,10 @@ import kotlin.jvm.*
1111
/** @suppress **This is unstable API and it is subject to change.** */
1212
internal open class LockFreeLinkedListNode {
1313
/**
14-
* Try putting this node into a list.
15-
*
16-
* Returns:
17-
* - The new head of the list if the operation succeeded.
18-
* - The head of the list if someone else concurrently added this node to the list,
19-
* but no other modifications to the list were made.
20-
*/
21-
fun attachToList(head: LockFreeLinkedListHead): LockFreeLinkedListHead {
22-
val newAddress = head.addLastWithoutModifying(this, permissionsBitmask = 0)
23-
assert { newAddress != null }
24-
return if (_address.compareAndSet(null, newAddress)) {
25-
head
26-
} else {
27-
_address.value!!.segment.head
28-
}
29-
}
30-
31-
/**
32-
* Remove this node from the list.
14+
* The default value of 0 means that either the node is not in any list or [LockFreeLinkedListHead.addLast] wasn't
15+
* yet called on it.
3316
*/
34-
open fun remove() {
35-
_address.value?.let {
36-
val segment = it.segment
37-
segment.clearSlot(it.index)
38-
}
39-
}
40-
41-
private val _address = atomic<Address?>(null)
42-
43-
val address: Address get() = _address.value!!
44-
45-
internal fun trySetAddress(address: Address) = this._address.compareAndSet(null, address)
17+
var address: Long = 0
4618
}
4719

4820
/** @suppress **This is unstable API and it is subject to change.** */
@@ -66,13 +38,13 @@ internal open class LockFreeLinkedListHead: LockFreeLinkedListSegment(
6638
*/
6739
inline fun forEach(
6840
forbidBitmask: Byte = 0,
69-
startAfter: LockFreeLinkedListNode? = null,
70-
block: (LockFreeLinkedListNode) -> Unit
41+
startInSegment: LockFreeLinkedListSegment? = null,
42+
startAfterIndex: Int? = null,
43+
block: (LockFreeLinkedListNode, LockFreeLinkedListSegment, Int) -> Unit
7144
) {
7245
forbiddenBits.update { it or forbidBitmask.toInt() }
73-
val startAddress = startAfter?.address
74-
var segment: LockFreeLinkedListSegment? = startAddress?.segment ?: head
75-
var startIndex: Int = startAddress?.index?.let { it + 1 } ?: 0
46+
var segment: LockFreeLinkedListSegment? = startInSegment ?: this
47+
var startIndex: Int = startAfterIndex?.let { it + 1 } ?: 0
7648
while (segment != null) {
7749
segment.forEach(forbidBitmask = forbidBitmask, startIndex = startIndex, block = block)
7850
segment = segment.next
@@ -85,17 +57,15 @@ internal open class LockFreeLinkedListHead: LockFreeLinkedListSegment(
8557
* and then sets the [node]'s address to the new address.
8658
*/
8759
fun addLast(node: LockFreeLinkedListNode, permissionsBitmask: Byte): Boolean {
88-
val address = addLastWithoutModifying(node, permissionsBitmask) ?: return false
89-
val success = node.trySetAddress(address)
90-
assert { success }
60+
node.address = addLastWithoutModifying(node, permissionsBitmask) ?: return false
9161
return true
9262
}
9363

9464
/**
9565
* Adds the [node] to the end of the list if every bit in [permissionsBitmask] is still allowed in the list.
9666
* As opposed to [addLast], doesn't modify the [node]'s address.
9767
*/
98-
fun addLastWithoutModifying(node: LockFreeLinkedListNode, permissionsBitmask: Byte): Address? {
68+
fun addLastWithoutModifying(node: LockFreeLinkedListNode, permissionsBitmask: Byte): Long? {
9969
/** First, avoid modifying the list at all if it was already closed for elements like ours. */
10070
if (permissionsBitmask and forbiddenBits.value.toByte() != 0.toByte()) return null
10171
/** Obtain the place from which the desired segment will certainly be reachable. */
@@ -115,13 +85,21 @@ internal open class LockFreeLinkedListHead: LockFreeLinkedListSegment(
11585
* to observe the new segment and either break the cell where [node] wants to arrive or process the [node].
11686
* In any case, we have linearizable behavior. */
11787
return if (segment.tryAdd(node, permissionsBitmask = permissionsBitmask, indexInSegment = indexInSegment)) {
118-
Address(segment, indexInSegment)
88+
index
11989
} else {
12090
null
12191
}
12292
}
12393

124-
override val head: LockFreeLinkedListHead get() = this
94+
fun remove(node: LockFreeLinkedListNode) {
95+
val address = node.address
96+
val id = address / SEGMENT_SIZE
97+
var segment: LockFreeLinkedListSegment = this
98+
while (segment.id < id) { segment = segment.next!! }
99+
if (segment.id == id) {
100+
segment.clearSlot((address % SEGMENT_SIZE).toInt(), node)
101+
}
102+
}
125103
}
126104

127105
internal open class LockFreeLinkedListSegment(
@@ -135,15 +113,15 @@ internal open class LockFreeLinkedListSegment(
135113

136114
override val numberOfSlots: Int get() = SEGMENT_SIZE
137115

138-
fun clearSlot(index: Int) {
139-
cells[index].value = null
140-
onSlotCleaned()
116+
fun clearSlot(index: Int, node: LockFreeLinkedListNode) {
117+
if (cells[index].compareAndSet(node, null))
118+
onSlotCleaned()
141119
}
142120

143-
inline fun forEach(forbidBitmask: Byte, startIndex: Int, block: (LockFreeLinkedListNode) -> Unit) {
121+
inline fun forEach(forbidBitmask: Byte, startIndex: Int, block: (LockFreeLinkedListNode, LockFreeLinkedListSegment, Int) -> Unit) {
144122
for (i in startIndex until SEGMENT_SIZE) {
145123
val node = breakCellOrGetValue(forbidBitmask, i)
146-
if (node != null) block(node)
124+
if (node != null) block(node, this, i)
147125
}
148126
}
149127

@@ -183,12 +161,8 @@ internal open class LockFreeLinkedListSegment(
183161
override fun onCancellation(index: Int, cause: Throwable?, context: CoroutineContext) {
184162
throw UnsupportedOperationException("Cancellation is not supported on LockFreeLinkedList")
185163
}
186-
187-
open val head: LockFreeLinkedListHead get() = prev!!.head
188164
}
189165

190-
internal class Address(@JvmField val segment: LockFreeLinkedListSegment, @JvmField val index: Int)
191-
192166
private fun createSegment(id: Long, prev: LockFreeLinkedListSegment): LockFreeLinkedListSegment =
193167
LockFreeLinkedListSegment(
194168
id = id,

Diff for: kotlinx-coroutines-core/jvm/test/MemoryFootprintTest.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@ class MemoryFootprintTest : TestBase(true) {
1313

1414
@Test
1515
fun testJobSize() {
16-
assertTotalSize(jobWithChildren(1), 104) // original: 112
17-
assertTotalSize(jobWithChildren(2), 320) // original: 192
18-
assertTotalSize(jobWithChildren(3), 392) // original: 248
19-
assertTotalSize(jobWithChildren(4), 464) // original: 304
16+
assertTotalSize(jobWithChildren(1), 112) // original: 112
17+
assertTotalSize(jobWithChildren(2), 288) // original: 192
18+
assertTotalSize(jobWithChildren(3), 344) // original: 248
19+
assertTotalSize(jobWithChildren(4), 400) // original: 304
2020
}
2121

2222
private fun jobWithChildren(numberOfChildren: Int): Job {

0 commit comments

Comments
 (0)