From 439ed26f404fff27d94e7767b43a56bc5a9ecf36 Mon Sep 17 00:00:00 2001 From: "Daria.Shutina" Date: Wed, 11 Dec 2024 18:05:42 +0100 Subject: [PATCH 01/15] Remove counters --- .../common/src/channels/BufferedChannel.kt | 128 ++++++------------ .../src/internal/ConcurrentLinkedList.kt | 107 ++++++++++----- .../common/src/sync/Semaphore.kt | 7 +- .../CancellableContinuationHandlersTest.kt | 3 +- 4 files changed, 123 insertions(+), 122 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt b/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt index f94a9e9970..a246eddd76 100644 --- a/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt @@ -90,9 +90,12 @@ internal open class BufferedChannel( private val receiveSegment: AtomicRef> private val bufferEndSegment: AtomicRef> + internal val sendSegmentId: Long get() = sendSegment.value.id + internal val receiveSegmentId: Long get() = receiveSegment.value.id + init { @Suppress("LeakingThis") - val firstSegment = ChannelSegment(id = 0, prev = null, channel = this, pointers = 3) + val firstSegment = ChannelSegment(id = 0, prev = null, channel = this) sendSegment = atomic(firstSegment) receiveSegment = atomic(firstSegment) // If this channel is rendezvous or has unlimited capacity, the algorithm never @@ -301,10 +304,6 @@ internal open class BufferedChannel( when (updateCellSend(segment, i, element, s, waiter, closed)) { RESULT_RENDEZVOUS -> { // A rendezvous with a receiver has happened. - // The previous segments are no longer needed - // for the upcoming requests, so the algorithm - // resets the link to the previous segment. - segment.cleanPrev() return onRendezvousOrBuffered() } RESULT_BUFFERED -> { @@ -325,17 +324,11 @@ internal open class BufferedChannel( } RESULT_CLOSED -> { // This channel is closed. - // In case this segment is already or going to be - // processed by a receiver, ensure that all the - // previous segments are unreachable. - if (s < receiversCounter) segment.cleanPrev() return onClosed() } RESULT_FAILED -> { // Either the cell stores an interrupted receiver, // or it was poisoned by a concurrent receiver. - // In both cases, all the previous segments are already processed, - segment.cleanPrev() continue } RESULT_SUSPEND_NO_WAITER -> { @@ -392,22 +385,16 @@ internal open class BufferedChannel( // restarting the operation from the beginning on failure. // Check the `sendImpl(..)` function for the comments. when (updateCellSend(segment, index, element, s, waiter, false)) { - RESULT_RENDEZVOUS -> { - segment.cleanPrev() - onRendezvousOrBuffered() - } - RESULT_BUFFERED -> { + RESULT_RENDEZVOUS, RESULT_BUFFERED -> { onRendezvousOrBuffered() } RESULT_SUSPEND -> { waiter.prepareSenderForSuspension(segment, index) } RESULT_CLOSED -> { - if (s < receiversCounter) segment.cleanPrev() onClosed() } RESULT_FAILED -> { - segment.cleanPrev() sendImpl( element = element, waiter = waiter, @@ -857,14 +844,9 @@ internal open class BufferedChannel( when { updCellResult === FAILED -> { // The cell is poisoned; restart from the beginning. - // To avoid memory leaks, we also need to reset - // the `prev` pointer of the working segment. - if (r < sendersCounter) segment.cleanPrev() } else -> { // element // A buffered element was retrieved from the cell. - // Clean the reference to the previous segment. - segment.cleanPrev() @Suppress("UNCHECKED_CAST") onUndeliveredElement?.callUndeliveredElementCatchingException(updCellResult as E)?.let { throw it } } @@ -938,9 +920,6 @@ internal open class BufferedChannel( // but failed: either the opposite request has // already been cancelled or the cell is poisoned. // Restart from the beginning in this case. - // To avoid memory leaks, we also need to reset - // the `prev` pointer of the working segment. - if (r < sendersCounter) segment.cleanPrev() continue } updCellResult === SUSPEND_NO_WAITER -> { @@ -951,8 +930,6 @@ internal open class BufferedChannel( else -> { // element // Either a buffered element was retrieved from the cell // or a rendezvous with a waiting sender has happened. - // Clean the reference to the previous segment before finishing. - segment.cleanPrev() @Suppress("UNCHECKED_CAST") onElementRetrieved(updCellResult as E) } @@ -987,7 +964,6 @@ internal open class BufferedChannel( waiter.prepareReceiverForSuspension(segment, index) } updCellResult === FAILED -> { - if (r < sendersCounter) segment.cleanPrev() receiveImpl( waiter = waiter, onElementRetrieved = onElementRetrieved, @@ -996,7 +972,6 @@ internal open class BufferedChannel( ) } else -> { - segment.cleanPrev() @Suppress("UNCHECKED_CAST") onElementRetrieved(updCellResult as E) } @@ -1211,7 +1186,7 @@ internal open class BufferedChannel( if (s <= b) { // Should `bufferEndSegment` be moved forward to avoid memory leaks? if (segment.id < id && segment.next != null) - moveSegmentBufferEndToSpecifiedOrLast(id, segment) + bufferEndSegment.moveToSpecifiedOrLast(id, segment) // Increment the number of completed `expandBuffer()`-s and finish. incCompletedExpandBufferAttempts() return @@ -2299,7 +2274,6 @@ internal open class BufferedChannel( // Otherwise, if the required segment is removed, the operation restarts. if (receiveSegment.value.id < id) return false else continue } - segment.cleanPrev() // all the previous segments are no longer needed. // Does the `r`-th cell contain waiting sender or buffered element? val i = (r % SEGMENT_SIZE).toInt() if (isCellNonEmpty(segment, i, r)) return true @@ -2398,12 +2372,6 @@ internal open class BufferedChannel( // This channel is already closed or cancelled; help to complete // the closing or cancellation procedure. completeCloseOrCancel() - // Clean the `prev` reference of the provided segment - // if all the previous cells are already covered by senders. - // It is important to clean the `prev` reference only in - // this case, as the closing/cancellation procedure may - // need correct value to traverse the linked list from right to left. - if (startFrom.id * SEGMENT_SIZE < receiversCounter) startFrom.cleanPrev() // As the required segment is not found and cannot be allocated, return `null`. null } else { @@ -2415,12 +2383,6 @@ internal open class BufferedChannel( // segment with `id` not lower than the required one. // Skip the sequence of removed cells in O(1). updateSendersCounterIfLower(segment.id * SEGMENT_SIZE) - // Clean the `prev` reference of the provided segment - // if all the previous cells are already covered by senders. - // It is important to clean the `prev` reference only in - // this case, as the closing/cancellation procedure may - // need correct value to traverse the linked list from right to left. - if (segment.id * SEGMENT_SIZE < receiversCounter) segment.cleanPrev() // As the required segment is not found and cannot be allocated, return `null`. null } else { @@ -2453,12 +2415,6 @@ internal open class BufferedChannel( // This channel is already closed or cancelled; help to complete // the closing or cancellation procedure. completeCloseOrCancel() - // Clean the `prev` reference of the provided segment - // if all the previous cells are already covered by senders. - // It is important to clean the `prev` reference only in - // this case, as the closing/cancellation procedure may - // need correct value to traverse the linked list from right to left. - if (startFrom.id * SEGMENT_SIZE < sendersCounter) startFrom.cleanPrev() // As the required segment is not found and cannot be allocated, return `null`. null } else { @@ -2474,12 +2430,6 @@ internal open class BufferedChannel( // segment with `id` not lower than the required one. // Skip the sequence of removed cells in O(1). updateReceiversCounterIfLower(segment.id * SEGMENT_SIZE) - // Clean the `prev` reference of the provided segment - // if all the previous cells are already covered by senders. - // It is important to clean the `prev` reference only in - // this case, as the closing/cancellation procedure may - // need correct value to traverse the linked list from right to left. - if (segment.id * SEGMENT_SIZE < sendersCounter) segment.cleanPrev() // As the required segment is already removed, return `null`. null } else { @@ -2504,7 +2454,7 @@ internal open class BufferedChannel( completeCloseOrCancel() // Update `bufferEndSegment` to the last segment // in the linked list to avoid memory leaks. - moveSegmentBufferEndToSpecifiedOrLast(id, startFrom) + bufferEndSegment.moveToSpecifiedOrLast(id, startFrom) // When this function does not find the requested segment, // it should update the number of completed `expandBuffer()` attempts. incCompletedExpandBufferAttempts() @@ -2534,32 +2484,6 @@ internal open class BufferedChannel( } } - /** - * Updates [bufferEndSegment] to the one with the specified [id] or - * to the last existing segment, if the required segment is not yet created. - * - * Unlike [findSegmentBufferEnd], this function does not allocate new segments. - */ - private fun moveSegmentBufferEndToSpecifiedOrLast(id: Long, startFrom: ChannelSegment) { - // Start searching the required segment from the specified one. - var segment: ChannelSegment = startFrom - while (segment.id < id) { - segment = segment.next ?: break - } - // Skip all removed segments and try to update `bufferEndSegment` - // to the first non-removed one. This part should succeed eventually, - // as the tail segment is never removed. - while (true) { - while (segment.isRemoved) { - segment = segment.next ?: break - } - // Try to update `bufferEndSegment`. On failure, - // the found segment is already removed, so it - // should be skipped. - if (bufferEndSegment.moveForward(segment)) return - } - } - /** * Updates the `senders` counter if its value * is lower that the specified one. @@ -2588,6 +2512,17 @@ internal open class BufferedChannel( if (receivers.compareAndSet(cur, value)) return } + /** + This method is used in the physical removal of the segment. It helps to move pointers forward from + the segment which was physically removed. + */ + internal fun movePointersForwardFrom(from: ChannelSegment) { + check(from.isRemoved) { "Trying to move channel pointers from the alive segment." } + if (from == sendSegment.value) sendSegment.moveToSpecifiedOrLast(from.id, from) + if (from == receiveSegment.value) receiveSegment.moveToSpecifiedOrLast(from.id, from) + if (from == bufferEndSegment.value) bufferEndSegment.moveToSpecifiedOrLast(from.id, from) + } + // ################### // # Debug Functions # // ################### @@ -2799,7 +2734,7 @@ internal open class BufferedChannel( * to update [BufferedChannel.sendSegment], [BufferedChannel.receiveSegment], * and [BufferedChannel.bufferEndSegment] correctly. */ -internal class ChannelSegment(id: Long, prev: ChannelSegment?, channel: BufferedChannel?, pointers: Int) : Segment>(id, prev, pointers) { +internal class ChannelSegment(id: Long, prev: ChannelSegment?, channel: BufferedChannel?) : Segment>(id, prev) { private val _channel: BufferedChannel? = channel val channel get() = _channel!! // always non-null except for `NULL_SEGMENT` @@ -2841,6 +2776,26 @@ internal class ChannelSegment(id: Long, prev: ChannelSegment?, channel: Bu internal fun getAndSetState(index: Int, update: Any?) = data[index * 2 + 1].getAndSet(update) + // ################################################### + // # Manipulation with the structure of segment list # + // ################################################### + + /** + * Shows if all segments going before this segment have been processed. + * When the value is true, the [prev] reference of the segment should be `null`. + */ + override val isLeftmostOrProcessed: Boolean get() = id <= channel.sendSegmentId && id <= channel.receiveSegmentId + + /** + * Removes the segment physically from the segment list. + * + * If the physical removal was successful and there are channel pointers pointing on this segment, + * the [BufferedChannel.movePointersForwardFrom] method is invoked to move them further on the segment list. + */ + override fun remove(): Boolean = + super.remove().also { + if (it) channel.movePointersForwardFrom(this) + } // ######################## // # Cancellation Support # @@ -2926,10 +2881,9 @@ internal fun createSegmentFunction(): KFunction2, Ch private fun createSegment(id: Long, prev: ChannelSegment) = ChannelSegment( id = id, prev = prev, - channel = prev.channel, - pointers = 0 + channel = prev.channel ) -private val NULL_SEGMENT = ChannelSegment(id = -1, prev = null, channel = null, pointers = 0) +private val NULL_SEGMENT = ChannelSegment(id = -1, prev = null, channel = null) /** * Number of cells in each segment. diff --git a/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt b/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt index 8f9b13f4ab..3293269bda 100644 --- a/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt +++ b/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt @@ -2,6 +2,7 @@ package kotlinx.coroutines.internal import kotlinx.atomicfu.* import kotlinx.coroutines.* +import kotlinx.coroutines.channels.SEGMENT_SIZE import kotlin.coroutines.* import kotlin.jvm.* @@ -34,26 +35,57 @@ internal fun > S.findSegmentInternal( return SegmentOrClosed(cur) } +/** + * Returns the segment with the specified [id] or the last one if the required one does not exist (if it was removed + * or was not created yet). + + * Unlike [findSegmentInternal], [findSpecifiedOrLast] does not add new segments to the list. + */ +internal fun > S.findSpecifiedOrLast(id: Long): S { + // Start searching the required segment from the specified one. + var cur: S = this + while (cur.id < id) { + cur = cur.next ?: break + } + return cur +} + /** * Returns `false` if the segment `to` is logically removed, `true` on a successful update. */ @Suppress("NOTHING_TO_INLINE", "RedundantNullableReturnType") // Must be inline because it is an AtomicRef extension internal inline fun > AtomicRef.moveForward(to: S): Boolean = loop { cur -> - if (cur.id >= to.id) return true - if (!to.tryIncPointers()) return false - if (compareAndSet(cur, to)) { // the segment is moved - if (cur.decPointers()) cur.remove() + if (cur.id >= to.id) return true // No need to update the pointer + if (to.isRemoved) return false // Trying to move pointer to the logically removed segment + if (compareAndSet(cur, to)) { // The segment is moved + if (to.isRemoved) return false // The segment was removed in parallel during the `CAS` operation + cleanLeftmostPrev(cur, to) return true } - if (to.decPointers()) to.remove() // undo tryIncPointers +} + +/** + * Cleans the `prev` reference of the leftmost segment in the list. The method works with the sublist which + * boundaries are specified by the given nodes [from] and [to]. It looks for the leftmost segment going from + * the tail to the head of the sublist. + * + * The method is called when [moveForward] successfully updates the value stored in the `AtomicRef` reference. + */ +internal inline fun > cleanLeftmostPrev(from: S, to: S) { + var cur = to + // Find the leftmost segment on the sublist between `from` and `to` segments. + while (!cur.isLeftmostOrProcessed && cur.id > from.id) { + cur = cur.prev ?: + // The `prev` reference was cleaned in parallel. + return + } + if (cur.isLeftmostOrProcessed) cur.cleanPrev() // The leftmost segment is found } /** * Tries to find a segment with the specified [id] following by next references from the * [startFrom] segment and creating new ones if needed. The typical use-case is reading this `AtomicRef` values, * doing some synchronization, and invoking this function to find the required segment and update the pointer. - * At the same time, [Segment.cleanPrev] should also be invoked if the previous segments are no longer needed - * (e.g., queues should use it in dequeue operations). * * Since segments can be removed from the list, or it can be closed for further segment additions. * Returns the segment `s` with `s.id >= id` or `CLOSED` if all the segments in this linked list have lower `id`, @@ -71,6 +103,27 @@ internal inline fun > AtomicRef.findSegmentAndMoveForward( } } +/** + * Updates the `AtomicRef` reference by moving it to the existing segment. + * + * Unlike [findSegmentAndMoveForward], [moveToSpecifiedOrLast] does not add new segments into the list. + */ +@Suppress("NOTHING_TO_INLINE") +internal inline fun > AtomicRef.moveToSpecifiedOrLast(id: Long, startFrom: S) { + // Start searching the required segment from the specified one. + var s = startFrom.findSpecifiedOrLast(id) + // Skip all removed segments and try to update the channel pointer to the first non-removed one. + // This part should succeed eventually, as the tail segment is never removed. + while (true) { + while (s.isRemoved) { + s = s.next ?: break + } + // Try to update the value of `AtomicRef`. + // On failure, the found segment is already removed, so it should be skipped. + if (moveForward(s)) return + } +} + /** * Closes this linked list of nodes by forbidding adding new ones, * returns the last node in the list. @@ -144,12 +197,14 @@ internal abstract class ConcurrentLinkedListNode /** * Removes this node physically from this linked list. The node should be * logically removed (so [isRemoved] returns `true`) at the point of invocation. + * + * Returns `true`, if the node was physically removed, and `false` otherwise. */ - fun remove() { + open fun remove(): Boolean { assert { isRemoved || isTail } // The node should be logically removed at first. // The physical tail cannot be removed. Instead, we remove it when // a new segment is added and this segment is not the tail one anymore. - if (isTail) return + if (isTail) return false while (true) { // Read `next` and `prev` pointers ignoring logically removed nodes. val prev = aliveSegmentLeft @@ -161,14 +216,14 @@ internal abstract class ConcurrentLinkedListNode if (next.isRemoved && !next.isTail) continue if (prev !== null && prev.isRemoved) continue // This node is removed. - return + return true } } private val aliveSegmentLeft: N? get() { var cur = prev while (cur !== null && cur.isRemoved) - cur = cur._prev.value + cur = cur.prev return cur } @@ -190,7 +245,7 @@ internal abstract class ConcurrentLinkedListNode * instance-check it and uses a separate code-path for that. */ internal abstract class Segment>( - @JvmField val id: Long, prev: S?, pointers: Int + @JvmField val id: Long, prev: S? ) : ConcurrentLinkedListNode(prev), // Segments typically store waiting continuations. Thus, on cancellation, the corresponding // slot should be cleaned and the segment should be removed if it becomes full of cancelled cells. @@ -207,21 +262,20 @@ internal abstract class Segment>( abstract val numberOfSlots: Int /** - * Numbers of cleaned slots (the lowest bits) and AtomicRef pointers to this segment (the highest bits) + * Number of cleaned slots. */ - private val cleanedAndPointers = atomic(pointers shl POINTERS_SHIFT) + private val cleanedSlots = atomic(0) /** * The segment is considered as removed if all the slots are cleaned * and there are no pointers to this segment from outside. */ - override val isRemoved get() = cleanedAndPointers.value == numberOfSlots && !isTail + override val isRemoved get() = cleanedSlots.value == numberOfSlots && !isTail - // increments the number of pointers if this segment is not logically removed. - internal fun tryIncPointers() = cleanedAndPointers.addConditionally(1 shl POINTERS_SHIFT) { it != numberOfSlots || isTail } - - // returns `true` if this segment is logically removed after the decrement. - internal fun decPointers() = cleanedAndPointers.addAndGet(-(1 shl POINTERS_SHIFT)) == numberOfSlots && !isTail + /** + * Shows if all nodes going before this node have been processed. + */ + abstract val isLeftmostOrProcessed: Boolean /** * This function is invoked on continuation cancellation when this segment @@ -240,15 +294,8 @@ internal abstract class Segment>( * Invoked on each slot clean-up; should not be invoked twice for the same slot. */ fun onSlotCleaned() { - if (cleanedAndPointers.incrementAndGet() == numberOfSlots) remove() - } -} - -private inline fun AtomicInt.addConditionally(delta: Int, condition: (cur: Int) -> Boolean): Boolean { - while (true) { - val cur = this.value - if (!condition(cur)) return false - if (this.compareAndSet(cur, cur + delta)) return true + check(cleanedSlots.incrementAndGet() <= SEGMENT_SIZE) { "Some cell was interrupted twice." } + if (isRemoved) remove() } } @@ -259,6 +306,4 @@ internal value class SegmentOrClosed>(private val value: Any?) { val segment: S get() = if (value === CLOSED) error("Does not contain segment") else value as S } -private const val POINTERS_SHIFT = 16 - private val CLOSED = Symbol("CLOSED") diff --git a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt index 7cc13f81f8..65e8255ef1 100644 --- a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt +++ b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt @@ -131,7 +131,7 @@ internal open class SemaphoreAndMutexImpl(private val permits: Int, acquiredPerm init { require(permits > 0) { "Semaphore should have at least 1 permit, but had $permits" } require(acquiredPermits in 0..permits) { "The number of acquired permits should be in 0..$permits" } - val s = SemaphoreSegment(0, null, 2) + val s = SemaphoreSegment(0, null) head = atomic(s) tail = atomic(s) } @@ -356,11 +356,12 @@ private class SemaphoreImpl( permits: Int, acquiredPermits: Int ): SemaphoreAndMutexImpl(permits, acquiredPermits), Semaphore -private fun createSegment(id: Long, prev: SemaphoreSegment?) = SemaphoreSegment(id, prev, 0) +private fun createSegment(id: Long, prev: SemaphoreSegment?) = SemaphoreSegment(id, prev) -private class SemaphoreSegment(id: Long, prev: SemaphoreSegment?, pointers: Int) : Segment(id, prev, pointers) { +private class SemaphoreSegment(id: Long, prev: SemaphoreSegment?) : Segment(id, prev) { val acquirers = atomicArrayOfNulls(SEGMENT_SIZE) override val numberOfSlots: Int get() = SEGMENT_SIZE + override val isLeftmostOrProcessed: Boolean get() = false // Does not impact semaphore implementation @Suppress("NOTHING_TO_INLINE") inline fun get(index: Int): Any? = acquirers[index].value diff --git a/kotlinx-coroutines-core/common/test/CancellableContinuationHandlersTest.kt b/kotlinx-coroutines-core/common/test/CancellableContinuationHandlersTest.kt index f005c93e5e..290db7580f 100644 --- a/kotlinx-coroutines-core/common/test/CancellableContinuationHandlersTest.kt +++ b/kotlinx-coroutines-core/common/test/CancellableContinuationHandlersTest.kt @@ -160,8 +160,9 @@ class CancellableContinuationHandlersTest : TestBase() { @Test fun testSegmentAsHandler() = runTest { - class MySegment : Segment(0, null, 0) { + class MySegment : Segment(0, null) { override val numberOfSlots: Int get() = 0 + override val isLeftmostOrProcessed: Boolean get() = false var invokeOnCancellationCalled = false override fun onCancellation(index: Int, cause: Throwable?, context: CoroutineContext) { From bd5a4c87482dd06e07167fb6921a1ffd8e481dbc Mon Sep 17 00:00:00 2001 From: "Daria.Shutina" Date: Tue, 17 Dec 2024 22:07:58 +0100 Subject: [PATCH 02/15] fix update of `bufferEndSegment` --- kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt b/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt index a246eddd76..24f3fe28a7 100644 --- a/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt @@ -2422,7 +2422,7 @@ internal open class BufferedChannel( val segment = it.segment // Advance the `bufferEnd` segment if required. if (!isRendezvousOrUnlimited && id <= bufferEndCounter / SEGMENT_SIZE) { - bufferEndSegment.moveForward(segment) + bufferEndSegment.value.let { bufferEndSegment.moveToSpecifiedOrLast(id, it) } } // Is the required segment removed? if (segment.id > id) { From 58a21eac7761cdf717b9fce82d892acd6817d033 Mon Sep 17 00:00:00 2001 From: "Daria.Shutina" Date: Tue, 17 Dec 2024 22:23:48 +0100 Subject: [PATCH 03/15] small fix --- .../common/src/channels/BufferedChannel.kt | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt b/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt index 24f3fe28a7..1512729c95 100644 --- a/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt @@ -302,12 +302,8 @@ internal open class BufferedChannel( // the channel is already closed, storing a waiter is illegal, so // the algorithm stores the `INTERRUPTED_SEND` token in this case. when (updateCellSend(segment, i, element, s, waiter, closed)) { - RESULT_RENDEZVOUS -> { - // A rendezvous with a receiver has happened. - return onRendezvousOrBuffered() - } - RESULT_BUFFERED -> { - // The element has been buffered. + RESULT_BUFFERED, RESULT_RENDEZVOUS -> { + // The element has been buffered or a rendezvous with a receiver has happened. return onRendezvousOrBuffered() } RESULT_SUSPEND -> { @@ -2422,7 +2418,7 @@ internal open class BufferedChannel( val segment = it.segment // Advance the `bufferEnd` segment if required. if (!isRendezvousOrUnlimited && id <= bufferEndCounter / SEGMENT_SIZE) { - bufferEndSegment.value.let { bufferEndSegment.moveToSpecifiedOrLast(id, it) } + bufferEndSegment.moveToSpecifiedOrLast(id, bufferEndSegment.value) } // Is the required segment removed? if (segment.id > id) { From b099acc00911c8af417929faa1018f4aa6d2515f Mon Sep 17 00:00:00 2001 From: "Daria.Shutina" Date: Wed, 18 Dec 2024 15:39:11 +0100 Subject: [PATCH 04/15] change `remove()` --- .../common/src/channels/BufferedChannel.kt | 16 ++++++++-------- .../common/src/internal/ConcurrentLinkedList.kt | 6 +++--- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt b/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt index 1512729c95..2747965aa1 100644 --- a/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt @@ -2512,8 +2512,8 @@ internal open class BufferedChannel( This method is used in the physical removal of the segment. It helps to move pointers forward from the segment which was physically removed. */ - internal fun movePointersForwardFrom(from: ChannelSegment) { - check(from.isRemoved) { "Trying to move channel pointers from the alive segment." } + internal fun movePointersForwardFromRemovedSegment(from: ChannelSegment) { + if (!from.isRemoved) return if (from == sendSegment.value) sendSegment.moveToSpecifiedOrLast(from.id, from) if (from == receiveSegment.value) receiveSegment.moveToSpecifiedOrLast(from.id, from) if (from == bufferEndSegment.value) bufferEndSegment.moveToSpecifiedOrLast(from.id, from) @@ -2785,13 +2785,13 @@ internal class ChannelSegment(id: Long, prev: ChannelSegment?, channel: Bu /** * Removes the segment physically from the segment list. * - * If the physical removal was successful and there are channel pointers pointing on this segment, - * the [BufferedChannel.movePointersForwardFrom] method is invoked to move them further on the segment list. + * After the physical removal is finished and there are channel pointers referencing the removed segment, + * the [BufferedChannel.movePointersForwardFromRemovedSegment] method is invoked to move them further on the segment list. */ - override fun remove(): Boolean = - super.remove().also { - if (it) channel.movePointersForwardFrom(this) - } + override fun remove() { + super.remove() + channel.movePointersForwardFromRemovedSegment(this) + } // ######################## // # Cancellation Support # diff --git a/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt b/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt index 3293269bda..9fe684537f 100644 --- a/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt +++ b/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt @@ -200,11 +200,11 @@ internal abstract class ConcurrentLinkedListNode * * Returns `true`, if the node was physically removed, and `false` otherwise. */ - open fun remove(): Boolean { + open fun remove() { assert { isRemoved || isTail } // The node should be logically removed at first. // The physical tail cannot be removed. Instead, we remove it when // a new segment is added and this segment is not the tail one anymore. - if (isTail) return false + if (isTail) return while (true) { // Read `next` and `prev` pointers ignoring logically removed nodes. val prev = aliveSegmentLeft @@ -216,7 +216,7 @@ internal abstract class ConcurrentLinkedListNode if (next.isRemoved && !next.isTail) continue if (prev !== null && prev.isRemoved) continue // This node is removed. - return true + return } } From ccc89f3c2d9562f3ac7a31a58ae140b4b0a20758 Mon Sep 17 00:00:00 2001 From: "Daria.Shutina" Date: Wed, 18 Dec 2024 16:54:34 +0100 Subject: [PATCH 05/15] `isLeftmostOrProcessed` in Semaphore --- .../common/src/channels/BufferedChannel.kt | 8 +++++-- .../src/internal/ConcurrentLinkedList.kt | 2 +- .../common/src/sync/Semaphore.kt | 22 ++++++++++++++----- 3 files changed, 24 insertions(+), 8 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt b/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt index 2747965aa1..42889ba840 100644 --- a/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt @@ -90,6 +90,10 @@ internal open class BufferedChannel( private val receiveSegment: AtomicRef> private val bufferEndSegment: AtomicRef> + /** + These values are used in [ChannelSegment.isLeftmostOrProcessed]. + They help to detect when the `prev` reference of the segment should be cleaned. + */ internal val sendSegmentId: Long get() = sendSegment.value.id internal val receiveSegmentId: Long get() = receiveSegment.value.id @@ -146,9 +150,9 @@ internal open class BufferedChannel( the segment and the index in it. */ segment: ChannelSegment, index: Int, - /** The element to be inserted. */ + /* The element to be inserted. */ element: E, - /** The global index of the cell. */ + /* The global index of the cell. */ s: Long ) = suspendCancellableCoroutineReusable sc@{ cont -> sendImplOnNoWaiter( // <-- this is an inline function diff --git a/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt b/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt index 9fe684537f..68dc9ed830 100644 --- a/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt +++ b/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt @@ -71,7 +71,7 @@ internal inline fun > AtomicRef.moveForward(to: S): Boolean = * * The method is called when [moveForward] successfully updates the value stored in the `AtomicRef` reference. */ -internal inline fun > cleanLeftmostPrev(from: S, to: S) { +private inline fun > cleanLeftmostPrev(from: S, to: S) { var cur = to // Find the leftmost segment on the sublist between `from` and `to` segments. while (!cur.isLeftmostOrProcessed && cur.id > from.id) { diff --git a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt index 65e8255ef1..4f654e2bb9 100644 --- a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt +++ b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt @@ -128,10 +128,16 @@ internal open class SemaphoreAndMutexImpl(private val permits: Int, acquiredPerm private val tail: AtomicRef private val enqIdx = atomic(0L) + /** + This value is used in [SemaphoreSegment.isLeftmostOrProcessed]. + It helps to detect when the `prev` reference of the segment should be cleaned. + */ + internal val headId: Long get() = head.value.id + init { require(permits > 0) { "Semaphore should have at least 1 permit, but had $permits" } require(acquiredPermits in 0..permits) { "The number of acquired permits should be in 0..$permits" } - val s = SemaphoreSegment(0, null) + val s = SemaphoreSegment(0, null, this) head = atomic(s) tail = atomic(s) } @@ -317,7 +323,6 @@ internal open class SemaphoreAndMutexImpl(private val permits: Int, acquiredPerm val createNewSegment = ::createSegment val segment = this.head.findSegmentAndMoveForward(id, startFrom = curHead, createNewSegment = createNewSegment).segment // cannot be closed - segment.cleanPrev() if (segment.id > id) return false val i = (deqIdx % SEGMENT_SIZE).toInt() val cellState = segment.getAndSet(i, PERMIT) // set PERMIT and retrieve the prev cell state @@ -356,12 +361,19 @@ private class SemaphoreImpl( permits: Int, acquiredPermits: Int ): SemaphoreAndMutexImpl(permits, acquiredPermits), Semaphore -private fun createSegment(id: Long, prev: SemaphoreSegment?) = SemaphoreSegment(id, prev) +private fun createSegment(id: Long, prev: SemaphoreSegment) = SemaphoreSegment( + id = id, + prev = prev, + semaphore = prev.semaphore +) -private class SemaphoreSegment(id: Long, prev: SemaphoreSegment?) : Segment(id, prev) { +private class SemaphoreSegment( + id: Long, prev: SemaphoreSegment?, + val semaphore: SemaphoreAndMutexImpl +) : Segment(id, prev) { val acquirers = atomicArrayOfNulls(SEGMENT_SIZE) override val numberOfSlots: Int get() = SEGMENT_SIZE - override val isLeftmostOrProcessed: Boolean get() = false // Does not impact semaphore implementation + override val isLeftmostOrProcessed: Boolean get() = id <= semaphore.headId @Suppress("NOTHING_TO_INLINE") inline fun get(index: Int): Any? = acquirers[index].value From a21060793c7cb4b81ec6702753a409d023de3fe8 Mon Sep 17 00:00:00 2001 From: "Daria.Shutina" Date: Mon, 13 Jan 2025 12:08:31 +0100 Subject: [PATCH 06/15] benchmark for a buffered channel --- .../benchmarks/BufferedChannelBenchmark.kt | 50 +++++++++++++++++++ 1 file changed, 50 insertions(+) create mode 100644 benchmarks/src/jmh/kotlin/benchmarks/BufferedChannelBenchmark.kt diff --git a/benchmarks/src/jmh/kotlin/benchmarks/BufferedChannelBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/BufferedChannelBenchmark.kt new file mode 100644 index 0000000000..fe1ba66ecc --- /dev/null +++ b/benchmarks/src/jmh/kotlin/benchmarks/BufferedChannelBenchmark.kt @@ -0,0 +1,50 @@ +package benchmarks + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import org.openjdk.jmh.annotations.* +import java.util.concurrent.* +import kotlin.coroutines.* + +@Warmup(iterations = 7, time = 1) +@Measurement(iterations = 5, time = 1) +@BenchmarkMode(Mode.Throughput, Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +@State(Scope.Benchmark) +@Fork(1) +open class BufferedChannelBenchmark { + @Param("1", "2") + var capacity: Int = 0 + + @Benchmark + fun channelPipeline(): Int = runBlocking { + run(Dispatchers.IO.limitedParallelism(3)) + } + + private suspend inline fun run(context: CoroutineContext): Int = + Channel.range(100_000, context) + .filter(context) { it % 4 == 0 } + .fold(0) { a, b -> a + b } + + private fun Channel.Factory.range(count: Int, context: CoroutineContext) = GlobalScope.produce(context, capacity) { + for (i in 0 until count) + send(i) + } + + // Migrated from deprecated operators, are good only for stressing channels + + private fun ReceiveChannel.filter(context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (Int) -> Boolean): ReceiveChannel = + GlobalScope.produce(context, onCompletion = { cancel() }) { + for (e in this@filter) { + if (predicate(e)) send(e) + } + } + + private suspend inline fun ReceiveChannel.fold(initial: R, operation: (acc: R, E) -> R): R { + var accumulator = initial + consumeEach { + accumulator = operation(accumulator, it) + } + return accumulator + } +} \ No newline at end of file From a1824e9b6d4bccf272fc06724acee789dc89ee05 Mon Sep 17 00:00:00 2001 From: "Daria.Shutina" Date: Mon, 13 Jan 2025 13:52:15 +0100 Subject: [PATCH 07/15] add threads annotation --- .../benchmarks/BufferedChannelBenchmark.kt | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/benchmarks/src/jmh/kotlin/benchmarks/BufferedChannelBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/BufferedChannelBenchmark.kt index fe1ba66ecc..bb91b221eb 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/BufferedChannelBenchmark.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/BufferedChannelBenchmark.kt @@ -11,6 +11,7 @@ import kotlin.coroutines.* @BenchmarkMode(Mode.Throughput, Mode.AverageTime) @OutputTimeUnit(TimeUnit.MICROSECONDS) @State(Scope.Benchmark) +@Threads(3) @Fork(1) open class BufferedChannelBenchmark { @Param("1", "2") @@ -18,12 +19,12 @@ open class BufferedChannelBenchmark { @Benchmark fun channelPipeline(): Int = runBlocking { - run(Dispatchers.IO.limitedParallelism(3)) + run(Dispatchers.Unconfined) } - private suspend inline fun run(context: CoroutineContext): Int = - Channel.range(100_000, context) - .filter(context) { it % 4 == 0 } + private suspend inline fun run(context: CoroutineContext) = + Channel + .range(100_000, context) .fold(0) { a, b -> a + b } private fun Channel.Factory.range(count: Int, context: CoroutineContext) = GlobalScope.produce(context, capacity) { @@ -33,13 +34,6 @@ open class BufferedChannelBenchmark { // Migrated from deprecated operators, are good only for stressing channels - private fun ReceiveChannel.filter(context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (Int) -> Boolean): ReceiveChannel = - GlobalScope.produce(context, onCompletion = { cancel() }) { - for (e in this@filter) { - if (predicate(e)) send(e) - } - } - private suspend inline fun ReceiveChannel.fold(initial: R, operation: (acc: R, E) -> R): R { var accumulator = initial consumeEach { From 2d89b8c4009bfb9d90694352efdb7ac9da2658e1 Mon Sep 17 00:00:00 2001 From: "Daria.Shutina" Date: Wed, 15 Jan 2025 17:04:11 +0100 Subject: [PATCH 08/15] change benchmarks --- .../src/jmh/kotlin/benchmarks/ChannelSinkBenchmark.kt | 7 +++++-- .../src/jmh/kotlin/benchmarks/ChannelSinkDepthBenchmark.kt | 7 +++++-- .../kotlin/benchmarks/ChannelSinkNoAllocationsBenchmark.kt | 5 ++++- 3 files changed, 14 insertions(+), 5 deletions(-) diff --git a/benchmarks/src/jmh/kotlin/benchmarks/ChannelSinkBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/ChannelSinkBenchmark.kt index 51061cce72..17933d5c3f 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/ChannelSinkBenchmark.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/ChannelSinkBenchmark.kt @@ -13,6 +13,9 @@ import kotlin.coroutines.* @State(Scope.Benchmark) @Fork(1) open class ChannelSinkBenchmark { + @Param("${Channel.RENDEZVOUS}", "${Channel.BUFFERED}") + var capacity: Int = 0 + private val tl = ThreadLocal.withInitial({ 42 }) private val tl2 = ThreadLocal.withInitial({ 239 }) @@ -42,7 +45,7 @@ open class ChannelSinkBenchmark { .fold(0) { a, b -> a + b } } - private fun Channel.Factory.range(start: Int, count: Int, context: CoroutineContext) = GlobalScope.produce(context) { + private fun Channel.Factory.range(start: Int, count: Int, context: CoroutineContext) = GlobalScope.produce(context, capacity) { for (i in start until (start + count)) send(i) } @@ -50,7 +53,7 @@ open class ChannelSinkBenchmark { // Migrated from deprecated operators, are good only for stressing channels private fun ReceiveChannel.filter(context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel = - GlobalScope.produce(context, onCompletion = { cancel() }) { + GlobalScope.produce(context, capacity, onCompletion = { cancel() }) { for (e in this@filter) { if (predicate(e)) send(e) } diff --git a/benchmarks/src/jmh/kotlin/benchmarks/ChannelSinkDepthBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/ChannelSinkDepthBenchmark.kt index 18a140f31b..8a8b29fccb 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/ChannelSinkDepthBenchmark.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/ChannelSinkDepthBenchmark.kt @@ -13,6 +13,9 @@ import kotlin.coroutines.* @State(Scope.Benchmark) @Fork(2) open class ChannelSinkDepthBenchmark { + @Param("${Channel.RENDEZVOUS}", "${Channel.BUFFERED}") + var capacity: Int = 0 + private val tl = ThreadLocal.withInitial({ 42 }) private val unconfinedOneElement = Dispatchers.Unconfined + tl.asContextElement() @@ -45,7 +48,7 @@ open class ChannelSinkDepthBenchmark { } private fun Channel.Factory.range(start: Int, count: Int, context: CoroutineContext) = - GlobalScope.produce(context) { + GlobalScope.produce(context, capacity) { for (i in start until (start + count)) send(i) } @@ -57,7 +60,7 @@ open class ChannelSinkDepthBenchmark { context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (Int) -> Boolean ): ReceiveChannel = - GlobalScope.produce(context, onCompletion = { cancel() }) { + GlobalScope.produce(context, capacity, onCompletion = { cancel() }) { deeplyNestedFilter(this, callTraceDepth, predicate) } diff --git a/benchmarks/src/jmh/kotlin/benchmarks/ChannelSinkNoAllocationsBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/ChannelSinkNoAllocationsBenchmark.kt index 8dec5f2a19..9e99f11a5f 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/ChannelSinkNoAllocationsBenchmark.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/ChannelSinkNoAllocationsBenchmark.kt @@ -13,6 +13,9 @@ import kotlin.coroutines.* @State(Scope.Benchmark) @Fork(1) open class ChannelSinkNoAllocationsBenchmark { + @Param("${Channel.RENDEZVOUS}", "${Channel.BUFFERED}") + var capacity: Int = 0 + private val unconfined = Dispatchers.Unconfined @Benchmark @@ -26,7 +29,7 @@ open class ChannelSinkNoAllocationsBenchmark { return size } - private fun Channel.Factory.range(context: CoroutineContext) = GlobalScope.produce(context) { + private fun Channel.Factory.range(context: CoroutineContext) = GlobalScope.produce(context, capacity) { for (i in 0 until 100_000) send(Unit) // no allocations } From 7a9906ba16e3770ffff2cd1ac1b856e712a1dd92 Mon Sep 17 00:00:00 2001 From: "Daria.Shutina" Date: Wed, 15 Jan 2025 19:03:37 +0100 Subject: [PATCH 09/15] remove BufferedChannelBenchmark --- .../benchmarks/BufferedChannelBenchmark.kt | 44 ------------------- 1 file changed, 44 deletions(-) delete mode 100644 benchmarks/src/jmh/kotlin/benchmarks/BufferedChannelBenchmark.kt diff --git a/benchmarks/src/jmh/kotlin/benchmarks/BufferedChannelBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/BufferedChannelBenchmark.kt deleted file mode 100644 index bb91b221eb..0000000000 --- a/benchmarks/src/jmh/kotlin/benchmarks/BufferedChannelBenchmark.kt +++ /dev/null @@ -1,44 +0,0 @@ -package benchmarks - -import kotlinx.coroutines.* -import kotlinx.coroutines.channels.* -import org.openjdk.jmh.annotations.* -import java.util.concurrent.* -import kotlin.coroutines.* - -@Warmup(iterations = 7, time = 1) -@Measurement(iterations = 5, time = 1) -@BenchmarkMode(Mode.Throughput, Mode.AverageTime) -@OutputTimeUnit(TimeUnit.MICROSECONDS) -@State(Scope.Benchmark) -@Threads(3) -@Fork(1) -open class BufferedChannelBenchmark { - @Param("1", "2") - var capacity: Int = 0 - - @Benchmark - fun channelPipeline(): Int = runBlocking { - run(Dispatchers.Unconfined) - } - - private suspend inline fun run(context: CoroutineContext) = - Channel - .range(100_000, context) - .fold(0) { a, b -> a + b } - - private fun Channel.Factory.range(count: Int, context: CoroutineContext) = GlobalScope.produce(context, capacity) { - for (i in 0 until count) - send(i) - } - - // Migrated from deprecated operators, are good only for stressing channels - - private suspend inline fun ReceiveChannel.fold(initial: R, operation: (acc: R, E) -> R): R { - var accumulator = initial - consumeEach { - accumulator = operation(accumulator, it) - } - return accumulator - } -} \ No newline at end of file From b1e992a86201c61088560aa5a846b666279c60f0 Mon Sep 17 00:00:00 2001 From: "Daria.Shutina" Date: Wed, 15 Jan 2025 15:51:41 +0100 Subject: [PATCH 10/15] old version of `cleanPrev` invocations --- .../common/src/channels/BufferedChannel.kt | 39 +++++++++++++++++-- .../src/internal/ConcurrentLinkedList.kt | 23 ++--------- 2 files changed, 40 insertions(+), 22 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt b/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt index 42889ba840..bf1796b506 100644 --- a/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt @@ -306,8 +306,16 @@ internal open class BufferedChannel( // the channel is already closed, storing a waiter is illegal, so // the algorithm stores the `INTERRUPTED_SEND` token in this case. when (updateCellSend(segment, i, element, s, waiter, closed)) { - RESULT_BUFFERED, RESULT_RENDEZVOUS -> { - // The element has been buffered or a rendezvous with a receiver has happened. + RESULT_RENDEZVOUS -> { + // A rendezvous with a receiver has happened. + // The previous segments are no longer needed + // for the upcoming requests, so the algorithm + // resets the link to the previous segment. + segment.cleanPrev() + return onRendezvousOrBuffered() + } + RESULT_BUFFERED -> { + // The element has been buffered. return onRendezvousOrBuffered() } RESULT_SUSPEND -> { @@ -324,11 +332,17 @@ internal open class BufferedChannel( } RESULT_CLOSED -> { // This channel is closed. + // In case this segment is already or going to be + // processed by a receiver, ensure that all the + // previous segments are unreachable. + if (s < receiversCounter) segment.cleanPrev() return onClosed() } RESULT_FAILED -> { // Either the cell stores an interrupted receiver, // or it was poisoned by a concurrent receiver. + // In both cases, all the previous segments are already processed, + segment.cleanPrev() continue } RESULT_SUSPEND_NO_WAITER -> { @@ -385,16 +399,22 @@ internal open class BufferedChannel( // restarting the operation from the beginning on failure. // Check the `sendImpl(..)` function for the comments. when (updateCellSend(segment, index, element, s, waiter, false)) { - RESULT_RENDEZVOUS, RESULT_BUFFERED -> { + RESULT_RENDEZVOUS -> { + segment.cleanPrev() + onRendezvousOrBuffered() + } + RESULT_BUFFERED -> { onRendezvousOrBuffered() } RESULT_SUSPEND -> { waiter.prepareSenderForSuspension(segment, index) } RESULT_CLOSED -> { + if (s < receiversCounter) segment.cleanPrev() onClosed() } RESULT_FAILED -> { + segment.cleanPrev() sendImpl( element = element, waiter = waiter, @@ -844,9 +864,14 @@ internal open class BufferedChannel( when { updCellResult === FAILED -> { // The cell is poisoned; restart from the beginning. + // To avoid memory leaks, we also need to reset + // the `prev` pointer of the working segment. + if (r < sendersCounter) segment.cleanPrev() } else -> { // element // A buffered element was retrieved from the cell. + // Clean the reference to the previous segment. + segment.cleanPrev() @Suppress("UNCHECKED_CAST") onUndeliveredElement?.callUndeliveredElementCatchingException(updCellResult as E)?.let { throw it } } @@ -920,6 +945,9 @@ internal open class BufferedChannel( // but failed: either the opposite request has // already been cancelled or the cell is poisoned. // Restart from the beginning in this case. + // To avoid memory leaks, we also need to reset + // the `prev` pointer of the working segment. + if (r < sendersCounter) segment.cleanPrev() continue } updCellResult === SUSPEND_NO_WAITER -> { @@ -930,6 +958,8 @@ internal open class BufferedChannel( else -> { // element // Either a buffered element was retrieved from the cell // or a rendezvous with a waiting sender has happened. + // Clean the reference to the previous segment before finishing. + segment.cleanPrev() @Suppress("UNCHECKED_CAST") onElementRetrieved(updCellResult as E) } @@ -964,6 +994,7 @@ internal open class BufferedChannel( waiter.prepareReceiverForSuspension(segment, index) } updCellResult === FAILED -> { + if (r < sendersCounter) segment.cleanPrev() receiveImpl( waiter = waiter, onElementRetrieved = onElementRetrieved, @@ -972,6 +1003,7 @@ internal open class BufferedChannel( ) } else -> { + segment.cleanPrev() @Suppress("UNCHECKED_CAST") onElementRetrieved(updCellResult as E) } @@ -2274,6 +2306,7 @@ internal open class BufferedChannel( // Otherwise, if the required segment is removed, the operation restarts. if (receiveSegment.value.id < id) return false else continue } + segment.cleanPrev() // all the previous segments are no longer needed. // Does the `r`-th cell contain waiting sender or buffered element? val i = (r % SEGMENT_SIZE).toInt() if (isCellNonEmpty(segment, i, r)) return true diff --git a/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt b/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt index 68dc9ed830..3f493d7b59 100644 --- a/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt +++ b/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt @@ -59,29 +59,13 @@ internal inline fun > AtomicRef.moveForward(to: S): Boolean = if (to.isRemoved) return false // Trying to move pointer to the logically removed segment if (compareAndSet(cur, to)) { // The segment is moved if (to.isRemoved) return false // The segment was removed in parallel during the `CAS` operation - cleanLeftmostPrev(cur, to) + // In case the segment is reached by both `sendSegment` and `receiveSegment`, + // clean its `prev` link to avoid memory leaks. + if (to.isLeftmostOrProcessed) to.cleanPrev() return true } } -/** - * Cleans the `prev` reference of the leftmost segment in the list. The method works with the sublist which - * boundaries are specified by the given nodes [from] and [to]. It looks for the leftmost segment going from - * the tail to the head of the sublist. - * - * The method is called when [moveForward] successfully updates the value stored in the `AtomicRef` reference. - */ -private inline fun > cleanLeftmostPrev(from: S, to: S) { - var cur = to - // Find the leftmost segment on the sublist between `from` and `to` segments. - while (!cur.isLeftmostOrProcessed && cur.id > from.id) { - cur = cur.prev ?: - // The `prev` reference was cleaned in parallel. - return - } - if (cur.isLeftmostOrProcessed) cur.cleanPrev() // The leftmost segment is found -} - /** * Tries to find a segment with the specified [id] following by next references from the * [startFrom] segment and creating new ones if needed. The typical use-case is reading this `AtomicRef` values, @@ -294,6 +278,7 @@ internal abstract class Segment>( * Invoked on each slot clean-up; should not be invoked twice for the same slot. */ fun onSlotCleaned() { + if (isLeftmostOrProcessed) cleanPrev() check(cleanedSlots.incrementAndGet() <= SEGMENT_SIZE) { "Some cell was interrupted twice." } if (isRemoved) remove() } From 597db0516193d48103226155e51d8eca7c071085 Mon Sep 17 00:00:00 2001 From: "Daria.Shutina" Date: Tue, 28 Jan 2025 15:59:31 +0100 Subject: [PATCH 11/15] new version of `cleanPrev` invocations --- .../common/src/channels/BufferedChannel.kt | 39 ++----------------- .../src/internal/ConcurrentLinkedList.kt | 23 +++++++++-- 2 files changed, 22 insertions(+), 40 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt b/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt index bf1796b506..42889ba840 100644 --- a/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt @@ -306,16 +306,8 @@ internal open class BufferedChannel( // the channel is already closed, storing a waiter is illegal, so // the algorithm stores the `INTERRUPTED_SEND` token in this case. when (updateCellSend(segment, i, element, s, waiter, closed)) { - RESULT_RENDEZVOUS -> { - // A rendezvous with a receiver has happened. - // The previous segments are no longer needed - // for the upcoming requests, so the algorithm - // resets the link to the previous segment. - segment.cleanPrev() - return onRendezvousOrBuffered() - } - RESULT_BUFFERED -> { - // The element has been buffered. + RESULT_BUFFERED, RESULT_RENDEZVOUS -> { + // The element has been buffered or a rendezvous with a receiver has happened. return onRendezvousOrBuffered() } RESULT_SUSPEND -> { @@ -332,17 +324,11 @@ internal open class BufferedChannel( } RESULT_CLOSED -> { // This channel is closed. - // In case this segment is already or going to be - // processed by a receiver, ensure that all the - // previous segments are unreachable. - if (s < receiversCounter) segment.cleanPrev() return onClosed() } RESULT_FAILED -> { // Either the cell stores an interrupted receiver, // or it was poisoned by a concurrent receiver. - // In both cases, all the previous segments are already processed, - segment.cleanPrev() continue } RESULT_SUSPEND_NO_WAITER -> { @@ -399,22 +385,16 @@ internal open class BufferedChannel( // restarting the operation from the beginning on failure. // Check the `sendImpl(..)` function for the comments. when (updateCellSend(segment, index, element, s, waiter, false)) { - RESULT_RENDEZVOUS -> { - segment.cleanPrev() - onRendezvousOrBuffered() - } - RESULT_BUFFERED -> { + RESULT_RENDEZVOUS, RESULT_BUFFERED -> { onRendezvousOrBuffered() } RESULT_SUSPEND -> { waiter.prepareSenderForSuspension(segment, index) } RESULT_CLOSED -> { - if (s < receiversCounter) segment.cleanPrev() onClosed() } RESULT_FAILED -> { - segment.cleanPrev() sendImpl( element = element, waiter = waiter, @@ -864,14 +844,9 @@ internal open class BufferedChannel( when { updCellResult === FAILED -> { // The cell is poisoned; restart from the beginning. - // To avoid memory leaks, we also need to reset - // the `prev` pointer of the working segment. - if (r < sendersCounter) segment.cleanPrev() } else -> { // element // A buffered element was retrieved from the cell. - // Clean the reference to the previous segment. - segment.cleanPrev() @Suppress("UNCHECKED_CAST") onUndeliveredElement?.callUndeliveredElementCatchingException(updCellResult as E)?.let { throw it } } @@ -945,9 +920,6 @@ internal open class BufferedChannel( // but failed: either the opposite request has // already been cancelled or the cell is poisoned. // Restart from the beginning in this case. - // To avoid memory leaks, we also need to reset - // the `prev` pointer of the working segment. - if (r < sendersCounter) segment.cleanPrev() continue } updCellResult === SUSPEND_NO_WAITER -> { @@ -958,8 +930,6 @@ internal open class BufferedChannel( else -> { // element // Either a buffered element was retrieved from the cell // or a rendezvous with a waiting sender has happened. - // Clean the reference to the previous segment before finishing. - segment.cleanPrev() @Suppress("UNCHECKED_CAST") onElementRetrieved(updCellResult as E) } @@ -994,7 +964,6 @@ internal open class BufferedChannel( waiter.prepareReceiverForSuspension(segment, index) } updCellResult === FAILED -> { - if (r < sendersCounter) segment.cleanPrev() receiveImpl( waiter = waiter, onElementRetrieved = onElementRetrieved, @@ -1003,7 +972,6 @@ internal open class BufferedChannel( ) } else -> { - segment.cleanPrev() @Suppress("UNCHECKED_CAST") onElementRetrieved(updCellResult as E) } @@ -2306,7 +2274,6 @@ internal open class BufferedChannel( // Otherwise, if the required segment is removed, the operation restarts. if (receiveSegment.value.id < id) return false else continue } - segment.cleanPrev() // all the previous segments are no longer needed. // Does the `r`-th cell contain waiting sender or buffered element? val i = (r % SEGMENT_SIZE).toInt() if (isCellNonEmpty(segment, i, r)) return true diff --git a/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt b/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt index 3f493d7b59..68dc9ed830 100644 --- a/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt +++ b/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt @@ -59,13 +59,29 @@ internal inline fun > AtomicRef.moveForward(to: S): Boolean = if (to.isRemoved) return false // Trying to move pointer to the logically removed segment if (compareAndSet(cur, to)) { // The segment is moved if (to.isRemoved) return false // The segment was removed in parallel during the `CAS` operation - // In case the segment is reached by both `sendSegment` and `receiveSegment`, - // clean its `prev` link to avoid memory leaks. - if (to.isLeftmostOrProcessed) to.cleanPrev() + cleanLeftmostPrev(cur, to) return true } } +/** + * Cleans the `prev` reference of the leftmost segment in the list. The method works with the sublist which + * boundaries are specified by the given nodes [from] and [to]. It looks for the leftmost segment going from + * the tail to the head of the sublist. + * + * The method is called when [moveForward] successfully updates the value stored in the `AtomicRef` reference. + */ +private inline fun > cleanLeftmostPrev(from: S, to: S) { + var cur = to + // Find the leftmost segment on the sublist between `from` and `to` segments. + while (!cur.isLeftmostOrProcessed && cur.id > from.id) { + cur = cur.prev ?: + // The `prev` reference was cleaned in parallel. + return + } + if (cur.isLeftmostOrProcessed) cur.cleanPrev() // The leftmost segment is found +} + /** * Tries to find a segment with the specified [id] following by next references from the * [startFrom] segment and creating new ones if needed. The typical use-case is reading this `AtomicRef` values, @@ -278,7 +294,6 @@ internal abstract class Segment>( * Invoked on each slot clean-up; should not be invoked twice for the same slot. */ fun onSlotCleaned() { - if (isLeftmostOrProcessed) cleanPrev() check(cleanedSlots.incrementAndGet() <= SEGMENT_SIZE) { "Some cell was interrupted twice." } if (isRemoved) remove() } From 3b74cc30d995b442f80bcc89572cf09c20a43304 Mon Sep 17 00:00:00 2001 From: "Daria.Shutina" Date: Thu, 30 Jan 2025 13:33:45 +0100 Subject: [PATCH 12/15] make `SemaphoreSegment` inner class --- .../common/src/sync/Semaphore.kt | 70 +++++++++---------- 1 file changed, 32 insertions(+), 38 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt index 4f654e2bb9..07a9fe6c82 100644 --- a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt +++ b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt @@ -137,7 +137,7 @@ internal open class SemaphoreAndMutexImpl(private val permits: Int, acquiredPerm init { require(permits > 0) { "Semaphore should have at least 1 permit, but had $permits" } require(acquiredPermits in 0..permits) { "The number of acquired permits should be in 0..$permits" } - val s = SemaphoreSegment(0, null, this) + val s = SemaphoreSegment(0, null) head = atomic(s) tail = atomic(s) } @@ -355,51 +355,45 @@ internal open class SemaphoreAndMutexImpl(private val permits: Int, acquiredPerm } else -> error("unexpected: $this") } -} -private class SemaphoreImpl( - permits: Int, acquiredPermits: Int -): SemaphoreAndMutexImpl(permits, acquiredPermits), Semaphore + private inner class SemaphoreSegment(id: Long, prev: SemaphoreSegment?) : Segment(id, prev) { + val acquirers = atomicArrayOfNulls(SEGMENT_SIZE) + override val numberOfSlots: Int get() = SEGMENT_SIZE + override val isLeftmostOrProcessed: Boolean get() = id <= headId -private fun createSegment(id: Long, prev: SemaphoreSegment) = SemaphoreSegment( - id = id, - prev = prev, - semaphore = prev.semaphore -) - -private class SemaphoreSegment( - id: Long, prev: SemaphoreSegment?, - val semaphore: SemaphoreAndMutexImpl -) : Segment(id, prev) { - val acquirers = atomicArrayOfNulls(SEGMENT_SIZE) - override val numberOfSlots: Int get() = SEGMENT_SIZE - override val isLeftmostOrProcessed: Boolean get() = id <= semaphore.headId - - @Suppress("NOTHING_TO_INLINE") - inline fun get(index: Int): Any? = acquirers[index].value - - @Suppress("NOTHING_TO_INLINE") - inline fun set(index: Int, value: Any?) { - acquirers[index].value = value - } + @Suppress("NOTHING_TO_INLINE") + inline fun get(index: Int): Any? = acquirers[index].value + + @Suppress("NOTHING_TO_INLINE") + inline fun set(index: Int, value: Any?) { + acquirers[index].value = value + } - @Suppress("NOTHING_TO_INLINE") - inline fun cas(index: Int, expected: Any?, value: Any?): Boolean = acquirers[index].compareAndSet(expected, value) + @Suppress("NOTHING_TO_INLINE") + inline fun cas(index: Int, expected: Any?, value: Any?): Boolean = acquirers[index].compareAndSet(expected, value) - @Suppress("NOTHING_TO_INLINE") - inline fun getAndSet(index: Int, value: Any?) = acquirers[index].getAndSet(value) + @Suppress("NOTHING_TO_INLINE") + inline fun getAndSet(index: Int, value: Any?) = acquirers[index].getAndSet(value) - // Cleans the acquirer slot located by the specified index - // and removes this segment physically if all slots are cleaned. - override fun onCancellation(index: Int, cause: Throwable?, context: CoroutineContext) { - // Clean the slot - set(index, CANCELLED) - // Remove this segment if needed - onSlotCleaned() + // Cleans the acquirer slot located by the specified index + // and removes this segment physically if all slots are cleaned. + override fun onCancellation(index: Int, cause: Throwable?, context: CoroutineContext) { + // Clean the slot + set(index, CANCELLED) + // Remove this segment if needed + onSlotCleaned() + } + + override fun toString() = "SemaphoreSegment[id=$id, hashCode=${hashCode()}]" } - override fun toString() = "SemaphoreSegment[id=$id, hashCode=${hashCode()}]" + private fun createSegment(id: Long, prev: SemaphoreSegment) = SemaphoreSegment(id = id, prev = prev) } + +private class SemaphoreImpl( + permits: Int, acquiredPermits: Int +): SemaphoreAndMutexImpl(permits, acquiredPermits), Semaphore + private val MAX_SPIN_CYCLES = systemProp("kotlinx.coroutines.semaphore.maxSpinCycles", 100) private val PERMIT = Symbol("PERMIT") private val TAKEN = Symbol("TAKEN") From 906a9721db54cb4dd992b983d02c8e28ce962780 Mon Sep 17 00:00:00 2001 From: "Daria.Shutina" Date: Thu, 30 Jan 2025 13:36:39 +0100 Subject: [PATCH 13/15] wrapper for `moveToSpecifiedOrLast` --- .../common/src/channels/BufferedChannel.kt | 34 ++++++++++++++----- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt b/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt index 42889ba840..1e549561c1 100644 --- a/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt @@ -1186,7 +1186,7 @@ internal open class BufferedChannel( if (s <= b) { // Should `bufferEndSegment` be moved forward to avoid memory leaks? if (segment.id < id && segment.next != null) - bufferEndSegment.moveToSpecifiedOrLast(id, segment) + moveSegmentBufferEndToSpecifiedOrLast(id, segment) // Increment the number of completed `expandBuffer()`-s and finish. incCompletedExpandBufferAttempts() return @@ -2422,7 +2422,7 @@ internal open class BufferedChannel( val segment = it.segment // Advance the `bufferEnd` segment if required. if (!isRendezvousOrUnlimited && id <= bufferEndCounter / SEGMENT_SIZE) { - bufferEndSegment.moveToSpecifiedOrLast(id, bufferEndSegment.value) + moveSegmentBufferEndToSpecifiedOrLast(id, bufferEndSegment.value) } // Is the required segment removed? if (segment.id > id) { @@ -2454,7 +2454,7 @@ internal open class BufferedChannel( completeCloseOrCancel() // Update `bufferEndSegment` to the last segment // in the linked list to avoid memory leaks. - bufferEndSegment.moveToSpecifiedOrLast(id, startFrom) + moveSegmentBufferEndToSpecifiedOrLast(id, startFrom) // When this function does not find the requested segment, // it should update the number of completed `expandBuffer()` attempts. incCompletedExpandBufferAttempts() @@ -2484,6 +2484,24 @@ internal open class BufferedChannel( } } + /** + * Serves as a wrapper for an inline function [AtomicRef.moveToSpecifiedOrLast] + */ + private fun moveSegmentBufferEndToSpecifiedOrLast(id: Long, startFrom: ChannelSegment) = + bufferEndSegment.moveToSpecifiedOrLast(id, startFrom) + + /** + * Serves as a wrapper for an inline function [AtomicRef.moveToSpecifiedOrLast] + */ + private fun moveSegmentReceiveToSpecifiedOrLast(id: Long, startFrom: ChannelSegment) = + receiveSegment.moveToSpecifiedOrLast(id, startFrom) + + /** + * Serves as a wrapper for an inline function [AtomicRef.moveToSpecifiedOrLast] + */ + private fun moveSegmentSendToSpecifiedOrLast(id: Long, startFrom: ChannelSegment) = + sendSegment.moveToSpecifiedOrLast(id, startFrom) + /** * Updates the `senders` counter if its value * is lower that the specified one. @@ -2516,11 +2534,11 @@ internal open class BufferedChannel( This method is used in the physical removal of the segment. It helps to move pointers forward from the segment which was physically removed. */ - internal fun movePointersForwardFromRemovedSegment(from: ChannelSegment) { - if (!from.isRemoved) return - if (from == sendSegment.value) sendSegment.moveToSpecifiedOrLast(from.id, from) - if (from == receiveSegment.value) receiveSegment.moveToSpecifiedOrLast(from.id, from) - if (from == bufferEndSegment.value) bufferEndSegment.moveToSpecifiedOrLast(from.id, from) + internal fun movePointersForwardFromRemovedSegment(removedSegment: ChannelSegment) { + if (!removedSegment.isRemoved) return + if (removedSegment == sendSegment.value) moveSegmentSendToSpecifiedOrLast(removedSegment.id, removedSegment) + if (removedSegment == receiveSegment.value) moveSegmentReceiveToSpecifiedOrLast(removedSegment.id, removedSegment) + if (removedSegment == bufferEndSegment.value) moveSegmentBufferEndToSpecifiedOrLast(removedSegment.id, removedSegment) } // ################### From 30082e7eafff97a23deb2d6c16b29641dde5b631 Mon Sep 17 00:00:00 2001 From: "Daria.Shutina" Date: Thu, 30 Jan 2025 16:34:16 +0100 Subject: [PATCH 14/15] fix comments --- .../common/src/channels/BufferedChannel.kt | 8 ++--- .../src/internal/ConcurrentLinkedList.kt | 36 +++++++++++++++++++ 2 files changed, 38 insertions(+), 6 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt b/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt index 1e549561c1..959658c3b9 100644 --- a/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt @@ -90,7 +90,7 @@ internal open class BufferedChannel( private val receiveSegment: AtomicRef> private val bufferEndSegment: AtomicRef> - /** + /* These values are used in [ChannelSegment.isLeftmostOrProcessed]. They help to detect when the `prev` reference of the segment should be cleaned. */ @@ -306,7 +306,7 @@ internal open class BufferedChannel( // the channel is already closed, storing a waiter is illegal, so // the algorithm stores the `INTERRUPTED_SEND` token in this case. when (updateCellSend(segment, i, element, s, waiter, closed)) { - RESULT_BUFFERED, RESULT_RENDEZVOUS -> { + RESULT_RENDEZVOUS, RESULT_BUFFERED -> { // The element has been buffered or a rendezvous with a receiver has happened. return onRendezvousOrBuffered() } @@ -2794,10 +2794,6 @@ internal class ChannelSegment(id: Long, prev: ChannelSegment?, channel: Bu internal fun getAndSetState(index: Int, update: Any?) = data[index * 2 + 1].getAndSet(update) - // ################################################### - // # Manipulation with the structure of segment list # - // ################################################### - /** * Shows if all segments going before this segment have been processed. * When the value is true, the [prev] reference of the segment should be `null`. diff --git a/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt b/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt index 68dc9ed830..266dfdacd5 100644 --- a/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt +++ b/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt @@ -274,6 +274,42 @@ internal abstract class Segment>( /** * Shows if all nodes going before this node have been processed. + * + * The value depends on the position of only two pointers - `sendSegment` + * and `receiveSegment`. The `bufferEndSegment` does not influence it. + * ``` + * S R EB + * │ │ │ + * ▼ ▼ ▼ + * ┌──────┐ ┌──────┐ ┌──────┐ + * │ #1 │ │ #2 │ │ #3 │ + * └──────┘ └──────┘ └──────┘ + * + * #1 is a processed segment => isLeftmostOrProcessed=true + * #2 is the leftmost segment => isLeftmostOrProcessed=true + * #3: isLeftmostOrProcessed=false + * ``` + * + * Normally, the `bufferEndSegment` pointer is located after `receiveSegment` + * on the segment list. However, there can be races when `expandBuffer()` of + * the existing `receive` request has not finished yet, but a new `receive` + * request has already come. + * In this case, the value still does not depend on the location of the + * `bufferEndSegment` pointer, even though the pointer can be before both + * `sendSegment` and `receiveSegment`: + * + * ``` + * EB S R + * │ │ │ + * ▼ ▼ ▼ + * ┌──────┐ ┌──────┐ ┌──────┐ + * │ #1 │ │ #2 │ │ #3 │ + * └──────┘ └──────┘ └──────┘ + * + * #1 is a processed segment => isLeftmostOrProcessed=true + * #2 is the leftmost segment => isLeftmostOrProcessed=true + * #3: isLeftmostOrProcessed=false + * ``` */ abstract val isLeftmostOrProcessed: Boolean From 4a29907c7bae392913aa0b95a1af0a2d58ae4fe0 Mon Sep 17 00:00:00 2001 From: "Daria.Shutina" Date: Thu, 30 Jan 2025 18:26:08 +0100 Subject: [PATCH 15/15] add check to `movePointersForwardFromRemovedSegment` --- .../common/src/channels/BufferedChannel.kt | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt b/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt index 959658c3b9..943fbee8de 100644 --- a/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt @@ -2531,11 +2531,14 @@ internal open class BufferedChannel( } /** - This method is used in the physical removal of the segment. It helps to move pointers forward from - the segment which was physically removed. + * This method is used in the physical removal of the segment. + * It helps to move pointers forward from the segment which + * was removed (logically or physically). + * + * The method should be called on the removed segments only. */ internal fun movePointersForwardFromRemovedSegment(removedSegment: ChannelSegment) { - if (!removedSegment.isRemoved) return + check(removedSegment.isRemoved) { "Trying to move pointers from the alive segment." } if (removedSegment == sendSegment.value) moveSegmentSendToSpecifiedOrLast(removedSegment.id, removedSegment) if (removedSegment == receiveSegment.value) moveSegmentReceiveToSpecifiedOrLast(removedSegment.id, removedSegment) if (removedSegment == bufferEndSegment.value) moveSegmentBufferEndToSpecifiedOrLast(removedSegment.id, removedSegment)