diff --git a/benchmarks/src/jmh/kotlin/benchmarks/ChannelSinkBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/ChannelSinkBenchmark.kt index 51061cce72..17933d5c3f 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/ChannelSinkBenchmark.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/ChannelSinkBenchmark.kt @@ -13,6 +13,9 @@ import kotlin.coroutines.* @State(Scope.Benchmark) @Fork(1) open class ChannelSinkBenchmark { + @Param("${Channel.RENDEZVOUS}", "${Channel.BUFFERED}") + var capacity: Int = 0 + private val tl = ThreadLocal.withInitial({ 42 }) private val tl2 = ThreadLocal.withInitial({ 239 }) @@ -42,7 +45,7 @@ open class ChannelSinkBenchmark { .fold(0) { a, b -> a + b } } - private fun Channel.Factory.range(start: Int, count: Int, context: CoroutineContext) = GlobalScope.produce(context) { + private fun Channel.Factory.range(start: Int, count: Int, context: CoroutineContext) = GlobalScope.produce(context, capacity) { for (i in start until (start + count)) send(i) } @@ -50,7 +53,7 @@ open class ChannelSinkBenchmark { // Migrated from deprecated operators, are good only for stressing channels private fun ReceiveChannel.filter(context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel = - GlobalScope.produce(context, onCompletion = { cancel() }) { + GlobalScope.produce(context, capacity, onCompletion = { cancel() }) { for (e in this@filter) { if (predicate(e)) send(e) } diff --git a/benchmarks/src/jmh/kotlin/benchmarks/ChannelSinkDepthBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/ChannelSinkDepthBenchmark.kt index 18a140f31b..8a8b29fccb 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/ChannelSinkDepthBenchmark.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/ChannelSinkDepthBenchmark.kt @@ -13,6 +13,9 @@ import kotlin.coroutines.* @State(Scope.Benchmark) @Fork(2) open class ChannelSinkDepthBenchmark { + @Param("${Channel.RENDEZVOUS}", "${Channel.BUFFERED}") + var capacity: Int = 0 + private val tl = ThreadLocal.withInitial({ 42 }) private val unconfinedOneElement = Dispatchers.Unconfined + tl.asContextElement() @@ -45,7 +48,7 @@ open class ChannelSinkDepthBenchmark { } private fun Channel.Factory.range(start: Int, count: Int, context: CoroutineContext) = - GlobalScope.produce(context) { + GlobalScope.produce(context, capacity) { for (i in start until (start + count)) send(i) } @@ -57,7 +60,7 @@ open class ChannelSinkDepthBenchmark { context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (Int) -> Boolean ): ReceiveChannel = - GlobalScope.produce(context, onCompletion = { cancel() }) { + GlobalScope.produce(context, capacity, onCompletion = { cancel() }) { deeplyNestedFilter(this, callTraceDepth, predicate) } diff --git a/benchmarks/src/jmh/kotlin/benchmarks/ChannelSinkNoAllocationsBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/ChannelSinkNoAllocationsBenchmark.kt index 8dec5f2a19..9e99f11a5f 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/ChannelSinkNoAllocationsBenchmark.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/ChannelSinkNoAllocationsBenchmark.kt @@ -13,6 +13,9 @@ import kotlin.coroutines.* @State(Scope.Benchmark) @Fork(1) open class ChannelSinkNoAllocationsBenchmark { + @Param("${Channel.RENDEZVOUS}", "${Channel.BUFFERED}") + var capacity: Int = 0 + private val unconfined = Dispatchers.Unconfined @Benchmark @@ -26,7 +29,7 @@ open class ChannelSinkNoAllocationsBenchmark { return size } - private fun Channel.Factory.range(context: CoroutineContext) = GlobalScope.produce(context) { + private fun Channel.Factory.range(context: CoroutineContext) = GlobalScope.produce(context, capacity) { for (i in 0 until 100_000) send(Unit) // no allocations } diff --git a/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt b/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt index f94a9e9970..943fbee8de 100644 --- a/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt @@ -90,9 +90,16 @@ internal open class BufferedChannel( private val receiveSegment: AtomicRef> private val bufferEndSegment: AtomicRef> + /* + These values are used in [ChannelSegment.isLeftmostOrProcessed]. + They help to detect when the `prev` reference of the segment should be cleaned. + */ + internal val sendSegmentId: Long get() = sendSegment.value.id + internal val receiveSegmentId: Long get() = receiveSegment.value.id + init { @Suppress("LeakingThis") - val firstSegment = ChannelSegment(id = 0, prev = null, channel = this, pointers = 3) + val firstSegment = ChannelSegment(id = 0, prev = null, channel = this) sendSegment = atomic(firstSegment) receiveSegment = atomic(firstSegment) // If this channel is rendezvous or has unlimited capacity, the algorithm never @@ -143,9 +150,9 @@ internal open class BufferedChannel( the segment and the index in it. */ segment: ChannelSegment, index: Int, - /** The element to be inserted. */ + /* The element to be inserted. */ element: E, - /** The global index of the cell. */ + /* The global index of the cell. */ s: Long ) = suspendCancellableCoroutineReusable sc@{ cont -> sendImplOnNoWaiter( // <-- this is an inline function @@ -299,16 +306,8 @@ internal open class BufferedChannel( // the channel is already closed, storing a waiter is illegal, so // the algorithm stores the `INTERRUPTED_SEND` token in this case. when (updateCellSend(segment, i, element, s, waiter, closed)) { - RESULT_RENDEZVOUS -> { - // A rendezvous with a receiver has happened. - // The previous segments are no longer needed - // for the upcoming requests, so the algorithm - // resets the link to the previous segment. - segment.cleanPrev() - return onRendezvousOrBuffered() - } - RESULT_BUFFERED -> { - // The element has been buffered. + RESULT_RENDEZVOUS, RESULT_BUFFERED -> { + // The element has been buffered or a rendezvous with a receiver has happened. return onRendezvousOrBuffered() } RESULT_SUSPEND -> { @@ -325,17 +324,11 @@ internal open class BufferedChannel( } RESULT_CLOSED -> { // This channel is closed. - // In case this segment is already or going to be - // processed by a receiver, ensure that all the - // previous segments are unreachable. - if (s < receiversCounter) segment.cleanPrev() return onClosed() } RESULT_FAILED -> { // Either the cell stores an interrupted receiver, // or it was poisoned by a concurrent receiver. - // In both cases, all the previous segments are already processed, - segment.cleanPrev() continue } RESULT_SUSPEND_NO_WAITER -> { @@ -392,22 +385,16 @@ internal open class BufferedChannel( // restarting the operation from the beginning on failure. // Check the `sendImpl(..)` function for the comments. when (updateCellSend(segment, index, element, s, waiter, false)) { - RESULT_RENDEZVOUS -> { - segment.cleanPrev() - onRendezvousOrBuffered() - } - RESULT_BUFFERED -> { + RESULT_RENDEZVOUS, RESULT_BUFFERED -> { onRendezvousOrBuffered() } RESULT_SUSPEND -> { waiter.prepareSenderForSuspension(segment, index) } RESULT_CLOSED -> { - if (s < receiversCounter) segment.cleanPrev() onClosed() } RESULT_FAILED -> { - segment.cleanPrev() sendImpl( element = element, waiter = waiter, @@ -857,14 +844,9 @@ internal open class BufferedChannel( when { updCellResult === FAILED -> { // The cell is poisoned; restart from the beginning. - // To avoid memory leaks, we also need to reset - // the `prev` pointer of the working segment. - if (r < sendersCounter) segment.cleanPrev() } else -> { // element // A buffered element was retrieved from the cell. - // Clean the reference to the previous segment. - segment.cleanPrev() @Suppress("UNCHECKED_CAST") onUndeliveredElement?.callUndeliveredElementCatchingException(updCellResult as E)?.let { throw it } } @@ -938,9 +920,6 @@ internal open class BufferedChannel( // but failed: either the opposite request has // already been cancelled or the cell is poisoned. // Restart from the beginning in this case. - // To avoid memory leaks, we also need to reset - // the `prev` pointer of the working segment. - if (r < sendersCounter) segment.cleanPrev() continue } updCellResult === SUSPEND_NO_WAITER -> { @@ -951,8 +930,6 @@ internal open class BufferedChannel( else -> { // element // Either a buffered element was retrieved from the cell // or a rendezvous with a waiting sender has happened. - // Clean the reference to the previous segment before finishing. - segment.cleanPrev() @Suppress("UNCHECKED_CAST") onElementRetrieved(updCellResult as E) } @@ -987,7 +964,6 @@ internal open class BufferedChannel( waiter.prepareReceiverForSuspension(segment, index) } updCellResult === FAILED -> { - if (r < sendersCounter) segment.cleanPrev() receiveImpl( waiter = waiter, onElementRetrieved = onElementRetrieved, @@ -996,7 +972,6 @@ internal open class BufferedChannel( ) } else -> { - segment.cleanPrev() @Suppress("UNCHECKED_CAST") onElementRetrieved(updCellResult as E) } @@ -2299,7 +2274,6 @@ internal open class BufferedChannel( // Otherwise, if the required segment is removed, the operation restarts. if (receiveSegment.value.id < id) return false else continue } - segment.cleanPrev() // all the previous segments are no longer needed. // Does the `r`-th cell contain waiting sender or buffered element? val i = (r % SEGMENT_SIZE).toInt() if (isCellNonEmpty(segment, i, r)) return true @@ -2398,12 +2372,6 @@ internal open class BufferedChannel( // This channel is already closed or cancelled; help to complete // the closing or cancellation procedure. completeCloseOrCancel() - // Clean the `prev` reference of the provided segment - // if all the previous cells are already covered by senders. - // It is important to clean the `prev` reference only in - // this case, as the closing/cancellation procedure may - // need correct value to traverse the linked list from right to left. - if (startFrom.id * SEGMENT_SIZE < receiversCounter) startFrom.cleanPrev() // As the required segment is not found and cannot be allocated, return `null`. null } else { @@ -2415,12 +2383,6 @@ internal open class BufferedChannel( // segment with `id` not lower than the required one. // Skip the sequence of removed cells in O(1). updateSendersCounterIfLower(segment.id * SEGMENT_SIZE) - // Clean the `prev` reference of the provided segment - // if all the previous cells are already covered by senders. - // It is important to clean the `prev` reference only in - // this case, as the closing/cancellation procedure may - // need correct value to traverse the linked list from right to left. - if (segment.id * SEGMENT_SIZE < receiversCounter) segment.cleanPrev() // As the required segment is not found and cannot be allocated, return `null`. null } else { @@ -2453,12 +2415,6 @@ internal open class BufferedChannel( // This channel is already closed or cancelled; help to complete // the closing or cancellation procedure. completeCloseOrCancel() - // Clean the `prev` reference of the provided segment - // if all the previous cells are already covered by senders. - // It is important to clean the `prev` reference only in - // this case, as the closing/cancellation procedure may - // need correct value to traverse the linked list from right to left. - if (startFrom.id * SEGMENT_SIZE < sendersCounter) startFrom.cleanPrev() // As the required segment is not found and cannot be allocated, return `null`. null } else { @@ -2466,7 +2422,7 @@ internal open class BufferedChannel( val segment = it.segment // Advance the `bufferEnd` segment if required. if (!isRendezvousOrUnlimited && id <= bufferEndCounter / SEGMENT_SIZE) { - bufferEndSegment.moveForward(segment) + moveSegmentBufferEndToSpecifiedOrLast(id, bufferEndSegment.value) } // Is the required segment removed? if (segment.id > id) { @@ -2474,12 +2430,6 @@ internal open class BufferedChannel( // segment with `id` not lower than the required one. // Skip the sequence of removed cells in O(1). updateReceiversCounterIfLower(segment.id * SEGMENT_SIZE) - // Clean the `prev` reference of the provided segment - // if all the previous cells are already covered by senders. - // It is important to clean the `prev` reference only in - // this case, as the closing/cancellation procedure may - // need correct value to traverse the linked list from right to left. - if (segment.id * SEGMENT_SIZE < sendersCounter) segment.cleanPrev() // As the required segment is already removed, return `null`. null } else { @@ -2535,30 +2485,22 @@ internal open class BufferedChannel( } /** - * Updates [bufferEndSegment] to the one with the specified [id] or - * to the last existing segment, if the required segment is not yet created. - * - * Unlike [findSegmentBufferEnd], this function does not allocate new segments. + * Serves as a wrapper for an inline function [AtomicRef.moveToSpecifiedOrLast] */ - private fun moveSegmentBufferEndToSpecifiedOrLast(id: Long, startFrom: ChannelSegment) { - // Start searching the required segment from the specified one. - var segment: ChannelSegment = startFrom - while (segment.id < id) { - segment = segment.next ?: break - } - // Skip all removed segments and try to update `bufferEndSegment` - // to the first non-removed one. This part should succeed eventually, - // as the tail segment is never removed. - while (true) { - while (segment.isRemoved) { - segment = segment.next ?: break - } - // Try to update `bufferEndSegment`. On failure, - // the found segment is already removed, so it - // should be skipped. - if (bufferEndSegment.moveForward(segment)) return - } - } + private fun moveSegmentBufferEndToSpecifiedOrLast(id: Long, startFrom: ChannelSegment) = + bufferEndSegment.moveToSpecifiedOrLast(id, startFrom) + + /** + * Serves as a wrapper for an inline function [AtomicRef.moveToSpecifiedOrLast] + */ + private fun moveSegmentReceiveToSpecifiedOrLast(id: Long, startFrom: ChannelSegment) = + receiveSegment.moveToSpecifiedOrLast(id, startFrom) + + /** + * Serves as a wrapper for an inline function [AtomicRef.moveToSpecifiedOrLast] + */ + private fun moveSegmentSendToSpecifiedOrLast(id: Long, startFrom: ChannelSegment) = + sendSegment.moveToSpecifiedOrLast(id, startFrom) /** * Updates the `senders` counter if its value @@ -2588,6 +2530,20 @@ internal open class BufferedChannel( if (receivers.compareAndSet(cur, value)) return } + /** + * This method is used in the physical removal of the segment. + * It helps to move pointers forward from the segment which + * was removed (logically or physically). + * + * The method should be called on the removed segments only. + */ + internal fun movePointersForwardFromRemovedSegment(removedSegment: ChannelSegment) { + check(removedSegment.isRemoved) { "Trying to move pointers from the alive segment." } + if (removedSegment == sendSegment.value) moveSegmentSendToSpecifiedOrLast(removedSegment.id, removedSegment) + if (removedSegment == receiveSegment.value) moveSegmentReceiveToSpecifiedOrLast(removedSegment.id, removedSegment) + if (removedSegment == bufferEndSegment.value) moveSegmentBufferEndToSpecifiedOrLast(removedSegment.id, removedSegment) + } + // ################### // # Debug Functions # // ################### @@ -2799,7 +2755,7 @@ internal open class BufferedChannel( * to update [BufferedChannel.sendSegment], [BufferedChannel.receiveSegment], * and [BufferedChannel.bufferEndSegment] correctly. */ -internal class ChannelSegment(id: Long, prev: ChannelSegment?, channel: BufferedChannel?, pointers: Int) : Segment>(id, prev, pointers) { +internal class ChannelSegment(id: Long, prev: ChannelSegment?, channel: BufferedChannel?) : Segment>(id, prev) { private val _channel: BufferedChannel? = channel val channel get() = _channel!! // always non-null except for `NULL_SEGMENT` @@ -2841,6 +2797,22 @@ internal class ChannelSegment(id: Long, prev: ChannelSegment?, channel: Bu internal fun getAndSetState(index: Int, update: Any?) = data[index * 2 + 1].getAndSet(update) + /** + * Shows if all segments going before this segment have been processed. + * When the value is true, the [prev] reference of the segment should be `null`. + */ + override val isLeftmostOrProcessed: Boolean get() = id <= channel.sendSegmentId && id <= channel.receiveSegmentId + + /** + * Removes the segment physically from the segment list. + * + * After the physical removal is finished and there are channel pointers referencing the removed segment, + * the [BufferedChannel.movePointersForwardFromRemovedSegment] method is invoked to move them further on the segment list. + */ + override fun remove() { + super.remove() + channel.movePointersForwardFromRemovedSegment(this) + } // ######################## // # Cancellation Support # @@ -2926,10 +2898,9 @@ internal fun createSegmentFunction(): KFunction2, Ch private fun createSegment(id: Long, prev: ChannelSegment) = ChannelSegment( id = id, prev = prev, - channel = prev.channel, - pointers = 0 + channel = prev.channel ) -private val NULL_SEGMENT = ChannelSegment(id = -1, prev = null, channel = null, pointers = 0) +private val NULL_SEGMENT = ChannelSegment(id = -1, prev = null, channel = null) /** * Number of cells in each segment. diff --git a/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt b/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt index 8f9b13f4ab..266dfdacd5 100644 --- a/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt +++ b/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt @@ -2,6 +2,7 @@ package kotlinx.coroutines.internal import kotlinx.atomicfu.* import kotlinx.coroutines.* +import kotlinx.coroutines.channels.SEGMENT_SIZE import kotlin.coroutines.* import kotlin.jvm.* @@ -34,26 +35,57 @@ internal fun > S.findSegmentInternal( return SegmentOrClosed(cur) } +/** + * Returns the segment with the specified [id] or the last one if the required one does not exist (if it was removed + * or was not created yet). + + * Unlike [findSegmentInternal], [findSpecifiedOrLast] does not add new segments to the list. + */ +internal fun > S.findSpecifiedOrLast(id: Long): S { + // Start searching the required segment from the specified one. + var cur: S = this + while (cur.id < id) { + cur = cur.next ?: break + } + return cur +} + /** * Returns `false` if the segment `to` is logically removed, `true` on a successful update. */ @Suppress("NOTHING_TO_INLINE", "RedundantNullableReturnType") // Must be inline because it is an AtomicRef extension internal inline fun > AtomicRef.moveForward(to: S): Boolean = loop { cur -> - if (cur.id >= to.id) return true - if (!to.tryIncPointers()) return false - if (compareAndSet(cur, to)) { // the segment is moved - if (cur.decPointers()) cur.remove() + if (cur.id >= to.id) return true // No need to update the pointer + if (to.isRemoved) return false // Trying to move pointer to the logically removed segment + if (compareAndSet(cur, to)) { // The segment is moved + if (to.isRemoved) return false // The segment was removed in parallel during the `CAS` operation + cleanLeftmostPrev(cur, to) return true } - if (to.decPointers()) to.remove() // undo tryIncPointers +} + +/** + * Cleans the `prev` reference of the leftmost segment in the list. The method works with the sublist which + * boundaries are specified by the given nodes [from] and [to]. It looks for the leftmost segment going from + * the tail to the head of the sublist. + * + * The method is called when [moveForward] successfully updates the value stored in the `AtomicRef` reference. + */ +private inline fun > cleanLeftmostPrev(from: S, to: S) { + var cur = to + // Find the leftmost segment on the sublist between `from` and `to` segments. + while (!cur.isLeftmostOrProcessed && cur.id > from.id) { + cur = cur.prev ?: + // The `prev` reference was cleaned in parallel. + return + } + if (cur.isLeftmostOrProcessed) cur.cleanPrev() // The leftmost segment is found } /** * Tries to find a segment with the specified [id] following by next references from the * [startFrom] segment and creating new ones if needed. The typical use-case is reading this `AtomicRef` values, * doing some synchronization, and invoking this function to find the required segment and update the pointer. - * At the same time, [Segment.cleanPrev] should also be invoked if the previous segments are no longer needed - * (e.g., queues should use it in dequeue operations). * * Since segments can be removed from the list, or it can be closed for further segment additions. * Returns the segment `s` with `s.id >= id` or `CLOSED` if all the segments in this linked list have lower `id`, @@ -71,6 +103,27 @@ internal inline fun > AtomicRef.findSegmentAndMoveForward( } } +/** + * Updates the `AtomicRef` reference by moving it to the existing segment. + * + * Unlike [findSegmentAndMoveForward], [moveToSpecifiedOrLast] does not add new segments into the list. + */ +@Suppress("NOTHING_TO_INLINE") +internal inline fun > AtomicRef.moveToSpecifiedOrLast(id: Long, startFrom: S) { + // Start searching the required segment from the specified one. + var s = startFrom.findSpecifiedOrLast(id) + // Skip all removed segments and try to update the channel pointer to the first non-removed one. + // This part should succeed eventually, as the tail segment is never removed. + while (true) { + while (s.isRemoved) { + s = s.next ?: break + } + // Try to update the value of `AtomicRef`. + // On failure, the found segment is already removed, so it should be skipped. + if (moveForward(s)) return + } +} + /** * Closes this linked list of nodes by forbidding adding new ones, * returns the last node in the list. @@ -144,8 +197,10 @@ internal abstract class ConcurrentLinkedListNode /** * Removes this node physically from this linked list. The node should be * logically removed (so [isRemoved] returns `true`) at the point of invocation. + * + * Returns `true`, if the node was physically removed, and `false` otherwise. */ - fun remove() { + open fun remove() { assert { isRemoved || isTail } // The node should be logically removed at first. // The physical tail cannot be removed. Instead, we remove it when // a new segment is added and this segment is not the tail one anymore. @@ -168,7 +223,7 @@ internal abstract class ConcurrentLinkedListNode private val aliveSegmentLeft: N? get() { var cur = prev while (cur !== null && cur.isRemoved) - cur = cur._prev.value + cur = cur.prev return cur } @@ -190,7 +245,7 @@ internal abstract class ConcurrentLinkedListNode * instance-check it and uses a separate code-path for that. */ internal abstract class Segment>( - @JvmField val id: Long, prev: S?, pointers: Int + @JvmField val id: Long, prev: S? ) : ConcurrentLinkedListNode(prev), // Segments typically store waiting continuations. Thus, on cancellation, the corresponding // slot should be cleaned and the segment should be removed if it becomes full of cancelled cells. @@ -207,21 +262,56 @@ internal abstract class Segment>( abstract val numberOfSlots: Int /** - * Numbers of cleaned slots (the lowest bits) and AtomicRef pointers to this segment (the highest bits) + * Number of cleaned slots. */ - private val cleanedAndPointers = atomic(pointers shl POINTERS_SHIFT) + private val cleanedSlots = atomic(0) /** * The segment is considered as removed if all the slots are cleaned * and there are no pointers to this segment from outside. */ - override val isRemoved get() = cleanedAndPointers.value == numberOfSlots && !isTail + override val isRemoved get() = cleanedSlots.value == numberOfSlots && !isTail - // increments the number of pointers if this segment is not logically removed. - internal fun tryIncPointers() = cleanedAndPointers.addConditionally(1 shl POINTERS_SHIFT) { it != numberOfSlots || isTail } - - // returns `true` if this segment is logically removed after the decrement. - internal fun decPointers() = cleanedAndPointers.addAndGet(-(1 shl POINTERS_SHIFT)) == numberOfSlots && !isTail + /** + * Shows if all nodes going before this node have been processed. + * + * The value depends on the position of only two pointers - `sendSegment` + * and `receiveSegment`. The `bufferEndSegment` does not influence it. + * ``` + * S R EB + * │ │ │ + * ▼ ▼ ▼ + * ┌──────┐ ┌──────┐ ┌──────┐ + * │ #1 │ │ #2 │ │ #3 │ + * └──────┘ └──────┘ └──────┘ + * + * #1 is a processed segment => isLeftmostOrProcessed=true + * #2 is the leftmost segment => isLeftmostOrProcessed=true + * #3: isLeftmostOrProcessed=false + * ``` + * + * Normally, the `bufferEndSegment` pointer is located after `receiveSegment` + * on the segment list. However, there can be races when `expandBuffer()` of + * the existing `receive` request has not finished yet, but a new `receive` + * request has already come. + * In this case, the value still does not depend on the location of the + * `bufferEndSegment` pointer, even though the pointer can be before both + * `sendSegment` and `receiveSegment`: + * + * ``` + * EB S R + * │ │ │ + * ▼ ▼ ▼ + * ┌──────┐ ┌──────┐ ┌──────┐ + * │ #1 │ │ #2 │ │ #3 │ + * └──────┘ └──────┘ └──────┘ + * + * #1 is a processed segment => isLeftmostOrProcessed=true + * #2 is the leftmost segment => isLeftmostOrProcessed=true + * #3: isLeftmostOrProcessed=false + * ``` + */ + abstract val isLeftmostOrProcessed: Boolean /** * This function is invoked on continuation cancellation when this segment @@ -240,15 +330,8 @@ internal abstract class Segment>( * Invoked on each slot clean-up; should not be invoked twice for the same slot. */ fun onSlotCleaned() { - if (cleanedAndPointers.incrementAndGet() == numberOfSlots) remove() - } -} - -private inline fun AtomicInt.addConditionally(delta: Int, condition: (cur: Int) -> Boolean): Boolean { - while (true) { - val cur = this.value - if (!condition(cur)) return false - if (this.compareAndSet(cur, cur + delta)) return true + check(cleanedSlots.incrementAndGet() <= SEGMENT_SIZE) { "Some cell was interrupted twice." } + if (isRemoved) remove() } } @@ -259,6 +342,4 @@ internal value class SegmentOrClosed>(private val value: Any?) { val segment: S get() = if (value === CLOSED) error("Does not contain segment") else value as S } -private const val POINTERS_SHIFT = 16 - private val CLOSED = Symbol("CLOSED") diff --git a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt index 7cc13f81f8..07a9fe6c82 100644 --- a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt +++ b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt @@ -128,10 +128,16 @@ internal open class SemaphoreAndMutexImpl(private val permits: Int, acquiredPerm private val tail: AtomicRef private val enqIdx = atomic(0L) + /** + This value is used in [SemaphoreSegment.isLeftmostOrProcessed]. + It helps to detect when the `prev` reference of the segment should be cleaned. + */ + internal val headId: Long get() = head.value.id + init { require(permits > 0) { "Semaphore should have at least 1 permit, but had $permits" } require(acquiredPermits in 0..permits) { "The number of acquired permits should be in 0..$permits" } - val s = SemaphoreSegment(0, null, 2) + val s = SemaphoreSegment(0, null) head = atomic(s) tail = atomic(s) } @@ -317,7 +323,6 @@ internal open class SemaphoreAndMutexImpl(private val permits: Int, acquiredPerm val createNewSegment = ::createSegment val segment = this.head.findSegmentAndMoveForward(id, startFrom = curHead, createNewSegment = createNewSegment).segment // cannot be closed - segment.cleanPrev() if (segment.id > id) return false val i = (deqIdx % SEGMENT_SIZE).toInt() val cellState = segment.getAndSet(i, PERMIT) // set PERMIT and retrieve the prev cell state @@ -350,43 +355,45 @@ internal open class SemaphoreAndMutexImpl(private val permits: Int, acquiredPerm } else -> error("unexpected: $this") } -} - -private class SemaphoreImpl( - permits: Int, acquiredPermits: Int -): SemaphoreAndMutexImpl(permits, acquiredPermits), Semaphore -private fun createSegment(id: Long, prev: SemaphoreSegment?) = SemaphoreSegment(id, prev, 0) + private inner class SemaphoreSegment(id: Long, prev: SemaphoreSegment?) : Segment(id, prev) { + val acquirers = atomicArrayOfNulls(SEGMENT_SIZE) + override val numberOfSlots: Int get() = SEGMENT_SIZE + override val isLeftmostOrProcessed: Boolean get() = id <= headId -private class SemaphoreSegment(id: Long, prev: SemaphoreSegment?, pointers: Int) : Segment(id, prev, pointers) { - val acquirers = atomicArrayOfNulls(SEGMENT_SIZE) - override val numberOfSlots: Int get() = SEGMENT_SIZE + @Suppress("NOTHING_TO_INLINE") + inline fun get(index: Int): Any? = acquirers[index].value - @Suppress("NOTHING_TO_INLINE") - inline fun get(index: Int): Any? = acquirers[index].value + @Suppress("NOTHING_TO_INLINE") + inline fun set(index: Int, value: Any?) { + acquirers[index].value = value + } - @Suppress("NOTHING_TO_INLINE") - inline fun set(index: Int, value: Any?) { - acquirers[index].value = value - } + @Suppress("NOTHING_TO_INLINE") + inline fun cas(index: Int, expected: Any?, value: Any?): Boolean = acquirers[index].compareAndSet(expected, value) - @Suppress("NOTHING_TO_INLINE") - inline fun cas(index: Int, expected: Any?, value: Any?): Boolean = acquirers[index].compareAndSet(expected, value) + @Suppress("NOTHING_TO_INLINE") + inline fun getAndSet(index: Int, value: Any?) = acquirers[index].getAndSet(value) - @Suppress("NOTHING_TO_INLINE") - inline fun getAndSet(index: Int, value: Any?) = acquirers[index].getAndSet(value) + // Cleans the acquirer slot located by the specified index + // and removes this segment physically if all slots are cleaned. + override fun onCancellation(index: Int, cause: Throwable?, context: CoroutineContext) { + // Clean the slot + set(index, CANCELLED) + // Remove this segment if needed + onSlotCleaned() + } - // Cleans the acquirer slot located by the specified index - // and removes this segment physically if all slots are cleaned. - override fun onCancellation(index: Int, cause: Throwable?, context: CoroutineContext) { - // Clean the slot - set(index, CANCELLED) - // Remove this segment if needed - onSlotCleaned() + override fun toString() = "SemaphoreSegment[id=$id, hashCode=${hashCode()}]" } - override fun toString() = "SemaphoreSegment[id=$id, hashCode=${hashCode()}]" + private fun createSegment(id: Long, prev: SemaphoreSegment) = SemaphoreSegment(id = id, prev = prev) } + +private class SemaphoreImpl( + permits: Int, acquiredPermits: Int +): SemaphoreAndMutexImpl(permits, acquiredPermits), Semaphore + private val MAX_SPIN_CYCLES = systemProp("kotlinx.coroutines.semaphore.maxSpinCycles", 100) private val PERMIT = Symbol("PERMIT") private val TAKEN = Symbol("TAKEN") diff --git a/kotlinx-coroutines-core/common/test/CancellableContinuationHandlersTest.kt b/kotlinx-coroutines-core/common/test/CancellableContinuationHandlersTest.kt index f005c93e5e..290db7580f 100644 --- a/kotlinx-coroutines-core/common/test/CancellableContinuationHandlersTest.kt +++ b/kotlinx-coroutines-core/common/test/CancellableContinuationHandlersTest.kt @@ -160,8 +160,9 @@ class CancellableContinuationHandlersTest : TestBase() { @Test fun testSegmentAsHandler() = runTest { - class MySegment : Segment(0, null, 0) { + class MySegment : Segment(0, null) { override val numberOfSlots: Int get() = 0 + override val isLeftmostOrProcessed: Boolean get() = false var invokeOnCancellationCalled = false override fun onCancellation(index: Int, cause: Throwable?, context: CoroutineContext) {