diff --git a/kotlinx-coroutines-core/build.gradle b/kotlinx-coroutines-core/build.gradle index be7622aa49..facb11b24b 100644 --- a/kotlinx-coroutines-core/build.gradle +++ b/kotlinx-coroutines-core/build.gradle @@ -119,8 +119,8 @@ task jvmStressTest(type: Test, dependsOn: compileTestKotlinJvm) { enableAssertions = true testLogging.showStandardStreams = true systemProperty 'kotlinx.coroutines.scheduler.keep.alive.sec', '100000' // any unpark problem hangs test - systemProperty 'kotlinx.coroutines.semaphore.segmentSize', '2' - systemProperty 'kotlinx.coroutines.semaphore.maxSpinCycles', '10' + systemProperty 'kotlinx.coroutines.sqs.segmentSize', '2' + systemProperty 'kotlinx.coroutines.sqs.maxSpinCycles', '10' } task jdk16Test(type: Test, dependsOn: [compileTestKotlinJvm, checkJdk16]) { diff --git a/kotlinx-coroutines-core/common/src/internal/SegmentQueueSynchronizer.kt b/kotlinx-coroutines-core/common/src/internal/SegmentQueueSynchronizer.kt new file mode 100644 index 0000000000..816d591b48 --- /dev/null +++ b/kotlinx-coroutines-core/common/src/internal/SegmentQueueSynchronizer.kt @@ -0,0 +1,606 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.internal + +import kotlinx.atomicfu.* +import kotlinx.coroutines.* +import kotlinx.coroutines.internal.SegmentQueueSynchronizer.ResumeMode.ASYNC +import kotlinx.coroutines.internal.SegmentQueueSynchronizer.ResumeMode.SYNC +import kotlinx.coroutines.internal.SegmentQueueSynchronizer.CancellationMode.* +import kotlinx.coroutines.sync.* +import kotlin.coroutines.* +import kotlin.native.concurrent.* + +/** + * [SegmentQueueSynchronizer] is an abstraction for implementing _fair_ synchronization + * and communication primitives. It maintains a FIFO queue of waiting requests and is + * provided with two main functions: + * + [suspend] that stores the specified waiter into the queue, and + * + [resume] that tries to retrieve and resume the first waiter with the specified value. + * + * One may consider this abstraction as an infinite array with two counters that reference the next cells + * for enqueueing a continuation in [suspend] and for retrieving it in [resume]. To be short, when + * [suspend] is invoked, it increments the corresponding counter via fast `Fetch-And-Add` and stores the + * continuation into the cell. At the same time, [resume] increments its own counter and comes to the + * corresponding cell. + * + * A typical implementation via [SegmentQueueSynchronizer] performs some synchronization at first, + * (e.g., [Semaphore] modifies the number of available permits), and invokes [suspend] or [resume] + * after that. Following this pattern, it is possible in a concurrent environment that [resume] + * is executed before [suspend] (similarly to the race between `park` and `unpark` for threads), + * so that [resume] comes to a cell in the empty state. This race can be solved with two [strategies][ResumeMode]: + * [asynchronous][ASYNC] and [synchronous][SYNC]. In the [asynchronous][ASYNC] mode, [resume] puts + * the value into the cell if it is still empty and finishes immediately; this way, the following [suspend] comes + * to this cell and simply grabs the element without suspension. At the same time, in the [synchronous][SYNC] mode, + * [resume] waits in a bounded spin-loop cycle until the put element is taken by a concurrent [suspend], + * marking the cell as broken if it is not taken after the spin-loop is finished. In this case, both the current + * [resume] and the [suspend] that comes to this broken cell fail and the corresponding operations on the data + * structure are typically restarted from the beginning. + * + * Since [suspend] can store [CancellableContinuation]-s, it is possible for [resume] to fail if the + * continuation is already cancelled. In this case, most of the algorithms retry the whole operation. + * However, if there are many consecutive cancelled waiters, it seems more efficient to skip them + * somehow. Thus, there are different [cancellation modes][cancellationMode] that can be used. + * In the simple mode ([SIMPLE]), which is used by default, [resume] simply fails on cancelled waiters as + * described above, while in smart modes ([SMART_SYNC] and [SMART_ASYNC]) [resume] skips cells in the + * [cancelled][CANCELLED] state. However, if cancellation happens concurrently with [resume], it can be illegal + * to simply skip the cell and resume the next waiter, e.g., if this cancelled waiter is the last one. + * Thus, it is possible for [SegmentQueueSynchronizer] to refuse this [resume]. In order to support this logic, + * users should implement [onCancellation] function, which returns `true` if the cell can be + * moved to the [cancelled][CANCELLED] state, or `false` if the [resume] that comes to this cell + * should be refused. In the last case, [tryReturnRefusedValue] is invoked, so that the value + * can be put back to the data structure. However, it is possible for [tryReturnRefusedValue] to + * fail, and [returnValue] is called in this case. Typically, this [returnValue] function coincide + * with the one that resumes waiters (e.g., [release][Semaphore.release] in [Semaphore]). The difference + * between [SMART_SYNC] and [SMART_ASYNC] modes is that in the first one the [resume] that comes to the + * cell with cancelled waiter waits in a spin-loop until the cell is marked either with [CANCELLED] + * or [REFUSE] mark, while in [SMART_ASYNC] mode it replaces the cancelled waiter with the value of + * the resumption and finishes, so that the cancellation handler completes this [resume] eventually. + * This way, in [SMART_ASYNC] mode the value passed to [resume] can be out of the data structure for a while. + * + * Here is a state machine for cells. Note that only one [suspend] + * and at most one [resume] operation can deal with each cell. + * + * +-------+ `suspend` succeeds. +--------------+ `resume` is +---------------+ store `RESUMED` to +---------+ ( `cont` HAS BEEN ) + * | NULL | ----------------------> | cont: active | -------------> | cont: resumed | --------------------> | RESUMED | ( RESUMED AND THIS ) + * +-------+ +--------------+ successful. +---------------+ avoid memory leaks. +---------+ ( `resume` SUCCEEDS ) + * | | + * | | The continuation + * | `resume` comes to | is cancelled. + * | the cell before | + * | `suspend` and puts V ( THE CORRESPONDING `resume` SHOULD BE ) + * | the element into +-----------------+ The concurrent `resume` should be refused, +--------+ ( REFUSED AND `tryReturnRefusedValue` OR ) + * | the cell, waiting | cont: cancelled | -----------------------------------------------> | REFUSE | ( `returnValue` IF IT FAILS IS USED TO ) + * | for `suspend` in +-----------------+ `onCancellation` has returned `false`. +--------+ ( RETURN THE VALUE BACK TO THE ORIGINAL ) + * | SYNC mode. | \ ^ ( SYNCHRONIZATION PRIMITIVE ) + * | | \ | + * | Mark the cell as `CANCELLED` | \ | + * | if the cancellation mode is | \ `resume` delegates its completing to | `onCancellation` returned `false, + * | `SIMPLE` or `onCancellation` | \ the concurrent cancellation handler if | mark the state accordingly and + * | has returned `true`. | \ `SMART_ASYNC` cancellation mode is used. | complete the hung `resume`. + * | | +------------------------------------------+ | + * | | \ | + * | ( THE CONTINUATION IS ) V V | + * | ( CANCELLED AND `resume` ) +-----------+ +-------+ + * | ( EITHER FAILS IN THE ) | CANCELLED | <-------------------------------------------------- | value | + * | ( SIMPLE CANCELLATION ) +-----------+ Mark the cell as `CANCELLED` if `onCancellation` +-------+ + * | ( MODE OR SKIPS THIS ) returned true, complete the hung `resume` accordingly. + * | ( CELL IN THE SMART ONE ) + * | + * | + * | `suspend` gets +-------+ ( ELIMINATION HAPPENED, ) + * | +-----------------> | TAKEN | ( BOTH `resume` and ) + * V | the element. +-------+ ( `suspend` SUCCEED ) + * +-------+ | + * | value | --< + * +-------+ | + * | `tryResume` has waited a bounded time, +--------+ + * +---------------------------------------> | BROKEN | (BOTH `suspend` AND `resume` FAIL) + * but `suspend` has not come. +--------+ + * + * As for the infinite array implementation, it is organized as a linked list of [segments][SQSSegment]; + * each segment contains a fixed number of cells. To determine the cell for each [suspend] and [resume] + * invocation, the algorithm reads the current [tail] or [head], increments [enqIdx] or [deqIdx], and + * finds the required segment starting from the initially read one. + */ +@InternalCoroutinesApi +internal abstract class SegmentQueueSynchronizer { + private val head: AtomicRef + private val deqIdx = atomic(0L) + private val tail: AtomicRef + private val enqIdx = atomic(0L) + + init { + val s = SQSSegment(0, null, 2) + head = atomic(s) + tail = atomic(s) + } + + /** + * Specifies whether [resume] should work in + * [synchronous][SYNC] or [asynchronous][ASYNC] mode. + */ + protected open val resumeMode: ResumeMode get() = SYNC + + /** + * Specifies whether [resume] should fail on cancelled waiters ([SIMPLE]), + * or skip them in either [synchronous][SMART_SYNC] or [asynchronous][SMART_ASYNC] + * way. In the asynchronous mode [resume] may pass the element to the + * cancellation handler in order not to wait, so that the element can be "hung" + * for a while, but it is guaranteed that this [resume] will be completed eventually. + */ + protected open val cancellationMode: CancellationMode get() = SIMPLE + + /** + * This function is invoked when waiter is cancelled and smart + * cancellation mode is used (so that cancelled cells are skipped by + * [resume]). Typically, this function performs the logical cancellation. + * It returns `true` if the cancellation succeeds and the cell can be + * marked as [CANCELLED]. This way, a concurrent [resume] skips this cell, + * and the value stays in the waiting queue. However, if the concurrent + * [resume] should be refused by this [SegmentQueueSynchronizer], + * [onCancellation] should return false. In this case, [tryReturnRefusedValue] + * is invoked with the value of [resume], following by [returnValue] + * if the attempt fails. + */ + open fun onCancellation() : Boolean = false + + /** + * This function specifies how the refused by + * this [SegmentQueueSynchronizer] value should + * be returned back to the data structure. It + * returns `true` if succeeds or `false` if the + * attempt failed, so that [returnValue] should + * be used to complete the returning. + */ + open fun tryReturnRefusedValue(value: T): Boolean = true + + /** + * This function specifies how the value from + * a failed [resume] should be returned back to + * the data structure. It is typically the function + * that invokes [resume] (e.g., [release][Semaphore.release] + * in [Semaphore]). + */ + open fun returnValue(value: T) {} + + /** + * This is a short-cut for [tryReturnRefusedValue] and + * the following [returnValue] invocation if it fails. + */ + private fun returnRefusedValue(value: T) { + if (tryReturnRefusedValue(value)) return + returnValue(value) + } + + /** + * Puts the specified continuation into the waiting queue, and returns `true` on success. + * Since [suspend] and [resume] can be invoked concurrently (similarly to `park` and `unpark` + * for threads), it is possible that [resume] comes earlier. In this case, the [resume] comes + * to the first cell and puts it value into it. After that, this [suspend] should come and + * grab the value. However, if the [synchronous][SYNC] resumption mode is used, the concurrent + * [resume] can mark its cell as [broken][BROKEN]; thus, this [suspend] invocation comes to the + * broken cell and fails. The typical patter is retrying both operations, the one that + * failed on [suspend] and the one that failed on [resume], from the beginning. + */ + @Suppress("UNCHECKED_CAST") + fun suspend(cont: Continuation): Boolean { + // Increment `enqIdx` and find the segment + // with the corresponding id. It is guaranteed + // that this segment is not removed since at + // least the cell for this [suspend] invocation + // is not in the `CANCELLED` state. + val curTail = this.tail.value + val enqIdx = enqIdx.getAndIncrement() + val segment = this.tail.findSegmentAndMoveForward(id = enqIdx / SEGMENT_SIZE, startFrom = curTail, + createNewSegment = ::createSegment).segment + assert { segment.id == enqIdx / SEGMENT_SIZE } + // Try to install the continuation in the cell, + // this is the regular path. + val i = (enqIdx % SEGMENT_SIZE).toInt() + if (segment.cas(i, null, cont)) { + // The continuation is successfully installed, + // add a cancellation handler if it is cancellable + // and complete successfully. + if (cont is CancellableContinuation<*>) { + cont.invokeOnCancellation(SQSCancellationHandler(segment, i).asHandler) + } + return true + } + // The continuation installation failed. This can happen only + // if a concurrent `resume` comes earlier to this cell and put + // its value into it. Note, that in `SYNC` resumption mode + // this concurrent `resume` can mark the cell as broken. + // + // Try to grab the value if the cell is not in the `BROKEN` state. + val value = segment.get(i) + if (value !== BROKEN && segment.cas(i, value, TAKEN)) { + // The elimination is successfully performed, + // resume the continuation with the value and complete. + cont.resume(value as T) + return true + } + // The cell is broken, this can happen only in `SYNC` resumption mode. + assert { resumeMode == SYNC && segment.get(i) === BROKEN } + return false + } + + /** + * Tries to resume the next waiter and returns `true` if + * the resumption succeeds. However, it can fail due to + * several reasons. First of all, if the [resumption mode][resumeMode] + * is [synchronous][SYNC], this [resume] invocation may come + * before [suspend] and mark the cell as [broken][BROKEN]; + * `false` is returned in this case. At the same time, according + * to the [simple cancellation mode][SIMPLE], this [resume] fails + * if the next waiter is cancelled, and returns `false` in this case. + * + * Note that when smart cancellation ([SMART_SYNC] or [SMART_ASYNC]) + * is used, [resume] skips cancelled waiters and can fail only in + * case of unsuccessful elimination due to [synchronous][SYNC] + * resumption mode. + */ + fun resume(value: T): Boolean { + // Should we skip cancelled cells? + val skipCancelled = cancellationMode != SIMPLE + while (true) { + // Try to resume the next waiter, adjust [deqIdx] if + // cancelled cells should be skipped anyway. + when (tryResumeImpl(value, adjustDeqIdx = skipCancelled)) { + TRY_RESUME_SUCCESS -> return true + TRY_RESUME_FAIL_CANCELLED -> if (!skipCancelled) return false + TRY_RESUME_FAIL_BROKEN -> return false + } + } + } + + /** + * Tries to resume the next waiter, and returns [TRY_RESUME_SUCCESS] on + * success, [TRY_RESUME_FAIL_CANCELLED] if the next waiter is cancelled, + * or [TRY_RESUME_FAIL_BROKEN] if the next cell is marked as broken by + * this [tryResumeImpl] invocation due to the [SYNC] resumption mode. + * + * In the smart cancellation modes ([SMART_SYNC] and [SMART_ASYNC]) the + * cells marked as [cancelled][CANCELLED] should be skipped, so that + * there is no need to increment [deqIdx] one-by-one is there is a + * removed segment (logically full of [cancelled][CANCELLED] cells); + * it is faster to point [deqIdx] to the first possibly non-cancelled + * cell instead, i.e. to the first segment id multiplied by the + * [segment size][SEGMENT_SIZE]. + */ + @Suppress("UNCHECKED_CAST") + private fun tryResumeImpl(value: T, adjustDeqIdx: Boolean): Int { + // Check that `adjustDeqIdx` is `false` + // in `SYNC` resumption mode. + assert { !(resumeMode == SYNC && adjustDeqIdx) } + // Increment `deqIdx` and find the first segment with + // the corresponding or higher (if the required segment + // is physically removed) id. + val curHead = this.head.value + val deqIdx = deqIdx.getAndIncrement() + val id = deqIdx / SEGMENT_SIZE + val segment = this.head.findSegmentAndMoveForward(id, startFrom = curHead, + createNewSegment = ::createSegment).segment + // The previous segments can be safely collected + // by GC, clean the pointer to them. + segment.cleanPrev() + // Is the required segment physically removed? + if (segment.id > id) { + // Adjust `deqIdx` to the first + // non-removed segment if needed. + if (adjustDeqIdx) adjustDeqIdx(segment.id * SEGMENT_SIZE) + // The cell #deqIdx is in the cancelled state, + // return the corresponding failure. + return TRY_RESUME_FAIL_CANCELLED + } + // Modify the cell according to the state machine, + // all the transitions are performed atomically. + val i = (deqIdx % SEGMENT_SIZE).toInt() + modify_cell@while (true) { + val cellState = segment.get(i) + when { + // Is the cell empty? + cellState === null -> { + // Try to perform an elimination by putting the + // value to the empty cell and wait until it is + // taken by a concurrent `suspend` in case of + // using the synchronous resumption mode. + if (!segment.cas(i, null, value)) continue@modify_cell + // Finish immediately in the async mode. + if (resumeMode == ASYNC) return TRY_RESUME_SUCCESS + // Wait for a concurrent `suspend` for a bounded + // time; it should mark the cell as taken. + repeat(MAX_SPIN_CYCLES) { + if (segment.get(i) === TAKEN) return TRY_RESUME_SUCCESS + } + // The value is still not taken, try to + // atomically mark the cell as broken. + // A failure indicates that the value is taken. + return if (segment.cas(i, value, BROKEN)) TRY_RESUME_FAIL_BROKEN else TRY_RESUME_SUCCESS + } + // Is the waiter cancelled? + cellState === CANCELLED -> { + // Return the corresponding failure. + return TRY_RESUME_FAIL_CANCELLED + } + // Should the current `resume` be refused + // by this `SegmentQueueSynchronizer`? + cellState === REFUSE -> { + // This state should not occur + // in the simple cancellation mode. + assert { cancellationMode != SIMPLE } + // Return the refused value back to + // the data structure and succeed. + returnRefusedValue(value) + return TRY_RESUME_SUCCESS + } + // Does the cell store a cancellable continuation? + cellState is CancellableContinuation<*> -> { + // Try to resume the continuation. + val success = (cellState as CancellableContinuation).tryResume0(value) + // Is the resumption successful? + if (success) { + // Mark the cell as `DONE` to avoid + // memory leaks and complete successfully. + segment.set(i, RESUMED) + return TRY_RESUME_SUCCESS + } + // The continuation is cancelled, the handling + // logic depends on the cancellation mode. + when (cancellationMode) { + // Fail correspondingly in the simple mode. + SIMPLE -> return TRY_RESUME_FAIL_CANCELLED + // In the smart cancellation mode this cell + // can be either skipped (if it is going to + // be marked as cancelled) or this `resume` + // should be refused. The `SMART_SYNC` mode + // waits in a an infinite spin-loop until + // the state of this cell is changed to + // either `CANCELLED` or `REFUSE`. + SMART_SYNC -> continue@modify_cell + // At the same time, in `SMART_ASYNC` mode + // `resume` replaces the cancelled continuation + // with the resumption value and completes. + // Thus, the concurrent cancellation handler + // notices this value and completes this `resume`. + SMART_ASYNC -> { + // Try to put the value into the cell if there is + // no decision on whether the cell is in the `CANCELLED` + // or `REFUSE` state, and finish if the put is performed. + val valueToStore: Any? = if (value is Continuation<*>) WrappedContinuationValue(value) else value + if (segment.cas(i, cellState, valueToStore)) return TRY_RESUME_SUCCESS + } + } + } + // The cell stores a non-cancellable + // continuation, we can simply resume it. + else -> { + // Resume the continuation and mark the cell + // as `DONE` to avoid memory leaks. + (cellState as Continuation).resume(value) + segment.set(i, RESUMED) + return TRY_RESUME_SUCCESS + } + } + } + } + + /** + * Updates [deqIdx] to [newValue] if the current value is lower. + * Thus, it is guaranteed that either the update is successfully + * performed or the value of [deqIdx] is greater or equal to [newValue]. + */ + private fun adjustDeqIdx(newValue: Long): Unit = deqIdx.loop { cur -> + if (cur >= newValue) return + if (deqIdx.compareAndSet(cur, newValue)) return + } + + /** + * These modes define the strategy that [resume] should + * use if it comes to the cell before [suspend] and finds it empty. + * In the [asynchronous][ASYNC] mode, it puts the value into the cell, + * so that [suspend] grabs it and immediately resumes without actual + * suspension; in other words, an elimination happens in this case. + * However, this strategy produces an incorrect behavior when used for some + * data structures (e.g., for [tryAcquire][Semaphore.tryAcquire] in [Semaphore]), + * so that the [synchronous][SYNC] is introduced. Similarly to the asynchronous one, + * [resume] puts the value into the cell, but do not finish right after that. + * In opposite, it waits in a bounded spin-loop (see [MAX_SPIN_CYCLES]) until + * the value is taken, completes after that. If the value is not taken after + * this spin-loop is finished, [resume] marks the cell as [broken][BROKEN] + * and fails, so that the corresponding [suspend] invocation finds the cell + * [broken][BROKEN] and fails as well. + */ + internal enum class ResumeMode { SYNC, ASYNC } + + /** + * These modes define the mode that should be used for cancellation. + * Essentially, there are two modes: simple and smart. + * Specifies whether [resume] should fail on cancelled waiters ([SIMPLE]), + * or skip them in either [synchronous][SMART_SYNC] or [asynchronous][SMART_ASYNC] + * way. In the asynchronous skip mode [resume] may pass the element to the + * cancellation handler in order not to wait, so that the element can be "hung" + * for a while. + */ + internal enum class CancellationMode { SIMPLE, SMART_SYNC, SMART_ASYNC } + + /** + * This cancellation handler is invoked when + * the waiter located by ```segment[index]``` + * is cancelled. + */ + private inner class SQSCancellationHandler( + private val segment: SQSSegment, + private val index: Int + ) : CancelHandler() { + override fun invoke(cause: Throwable?) { + // Do we use simple or smart cancellation? + if (cancellationMode === SIMPLE) { + // In the simple cancellation mode the logic + // is straightforward -- mark the cell as + // cancelled to avoid memory leaks and complete. + segment.markCancelled(index) + return + } + // We are in the smart cancellation mode. + // Perform the cancellation-related logic and + // check whether the value of the `resume` that + // comes to this cell should be processed in the + // `SegmentQueueSynchronizer` or refused by it. + val cancelled = onCancellation() + if (cancelled) { + // The cell should be considered as cancelled. + // Mark the cell correspondingly and help a + // concurrent `resume` to process its value if + // needed (see `SMART_ASYNC` cancellation mode). + val value = segment.markCancelled(index) ?: return + // Try to resume the next waiter with the value + // provided by a concurrent `resume`. + if (resume(value as T)) return + // The resumption has been failed because of the + // `SYNC` resume mode. Return the value back to + // the original data structure. + returnValue(value) + } else { + // The value of the `resume` that comes to this + // cell should be refused by this `SegmentQueueSynchronizer`. + // Mark the cell correspondingly and help a concurrent + // `resume` to process its value if needed + // (see `SMART_ASYNC` cancellation mode). + val value = segment.markRefused(index) ?: return + returnRefusedValue(value as T) + } + } + + override fun toString() = "SQSCancellationHandler[$segment, $index]" + } +} + +/** + * Tries to resume this continuation atomically, + * returns `true` if succeeds and `false` otherwise. + */ +private fun CancellableContinuation.tryResume0(value: T): Boolean { + val token = tryResume(value) ?: return false + completeResume(token) + return true +} + +private fun createSegment(id: Long, prev: SQSSegment?) = SQSSegment(id, prev, 0) + +/** + * The queue of waiters in [SegmentQueueSynchronizer] + * is represented as a linked list of these segments. + */ +private class SQSSegment(id: Long, prev: SQSSegment?, pointers: Int) : Segment(id, prev, pointers) { + private val waiters = atomicArrayOfNulls(SEGMENT_SIZE) + override val maxSlots: Int get() = SEGMENT_SIZE + + @Suppress("NOTHING_TO_INLINE") + inline fun get(index: Int): Any? = waiters[index].value + + @Suppress("NOTHING_TO_INLINE") + inline fun set(index: Int, value: Any?) { + waiters[index].value = value + } + + @Suppress("NOTHING_TO_INLINE") + inline fun cas(index: Int, expected: Any?, value: Any?): Boolean = waiters[index].compareAndSet(expected, value) + + @Suppress("NOTHING_TO_INLINE") + inline fun getAndSet(index: Int, value: Any?): Any? = waiters[index].getAndSet(value) + + /** + * Marks the cell as cancelled and returns `null`, so that + * the [resume] that comes to the cell should notice + * that the cell is cancelled and should fail or skip it + * depending on the cancellation mode. However, in [SMART_ASYNC] + * cancellation mode [resume] that comes to the cell with cancelled + * continuation asynchronously puts its value into the cell, + * and the cancellation handler completes the resumption. + * In this case, [markCancelled] returns this non-null value. + * + * If the whole segment contains [CANCELLED] markers after + * this invocation, [onSlotCleaned] is invoked and this segment + * is going to be removed if [head][SegmentQueueSynchronizer.head] + * and [tail][SegmentQueueSynchronizer.tail] do not reference it. + * Note that the segments that are not stored physically are still + * considered as logically stored but being full of cancelled waiters. + */ + fun markCancelled(index: Int): Any? = mark(index, CANCELLED).also { + onSlotCleaned() + } + + /** + * Marks the cell as refused and returns `null`, so that + * the [resume] that comes to the cell should notice + * that its value is refused by the [SegmentQueueSynchronizer], + * and [SegmentQueueSynchronizer.tryReturnRefusedValue] + * is invoked in this case (if it fails, the value is put back via + * [SegmentQueueSynchronizer.returnValue]). Since in [SMART_ASYNC] + * cancellation mode [resume] that comes to the cell with cancelled + * continuation asynchronously puts its value into the cell. + * In this case, [markRefused] returns this non-null value. + */ + fun markRefused(index: Int): Any? = mark(index, REFUSE) + + /** + * Marks the cell with the specified [marker] + * and returns `null` if the cell contains the + * cancelled continuation. However, in the [SMART_ASYNC] + * cancellation mode it is possible that [resume] comes + * to the cell with cancelled continuation and asynchronously + * puts its value into the cell, so that the cancellation + * handler decides whether this value should be used for + * resuming the next waiter or be refused. In the latter case, + * the corresponding non-null value is returned as a result. + */ + private fun mark(index: Int, marker: Any?): Any? = + when (val old = getAndSet(index, marker)) { + // Did the cell contain the cancelled continuation? + is Continuation<*> -> { + assert { if (old is CancellableContinuation<*>) old.isCancelled else true } + null + } + // Did the cell contain an asynchronously put value? + // (both branches deal with values) + is WrappedContinuationValue -> old.cont + else -> old + } + + override fun toString() = "SQSSegment[id=$id, hashCode=${hashCode()}]" +} + +/** + * In the [smart asynchronous cancellation mode][SegmentQueueSynchronizer.CancellationMode.SMART_ASYNC] + * it is possible that [resume] comes to the cell with cancelled continuation and + * asynchronously puts its value into the cell, so that the cancellation handler decides whether + * this value should be used for resuming the next waiter or be refused. When this + * value is a continuation, it is hard to distinguish it with the one related to the cancelled + * waiter. Thus, such values are wrapped with [WrappedContinuationValue] in this case. Note, that the + * wrapper is required only in [SegmentQueueSynchronizer.CancellationMode.SMART_ASYNC] mode + * and is used in the asynchronous race resolution logic between cancellation and [resume] + * invocation; this way, it is used relatively rare. + */ +private class WrappedContinuationValue(val cont: Continuation<*>) + +@SharedImmutable +private val SEGMENT_SIZE = systemProp("kotlinx.coroutines.sqs.segmentSize", 16) +@SharedImmutable +private val MAX_SPIN_CYCLES = systemProp("kotlinx.coroutines.sqs.maxSpinCycles", 100) +@SharedImmutable +private val TAKEN = Symbol("TAKEN") +@SharedImmutable +private val BROKEN = Symbol("BROKEN") +@SharedImmutable +private val CANCELLED = Symbol("CANCELLED") +@SharedImmutable +private val REFUSE = Symbol("REFUSE") +@SharedImmutable +private val RESUMED = Symbol("RESUMED") + +private const val TRY_RESUME_SUCCESS = 0 +private const val TRY_RESUME_FAIL_CANCELLED = 1 +private const val TRY_RESUME_FAIL_BROKEN = 2 \ No newline at end of file diff --git a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt index 27c976ce3f..144ea8c016 100644 --- a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt +++ b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt @@ -10,7 +10,6 @@ import kotlinx.coroutines.internal.* import kotlin.contracts.* import kotlin.coroutines.* import kotlin.math.* -import kotlin.native.concurrent.SharedImmutable /** * A counting semaphore for coroutines that logically maintains a number of available permits. @@ -89,53 +88,10 @@ public suspend inline fun Semaphore.withPermit(action: () -> T): T { } } -private class SemaphoreImpl(private val permits: Int, acquiredPermits: Int) : Semaphore { - /* - The queue of waiting acquirers is essentially an infinite array based on the list of segments - (see `SemaphoreSegment`); each segment contains a fixed number of slots. To determine a slot for each enqueue - and dequeue operation, we increment the corresponding counter at the beginning of the operation - and use the value before the increment as a slot number. This way, each enqueue-dequeue pair - works with an individual cell. We use the corresponding segment pointers to find the required ones. - - Here is a state machine for cells. Note that only one `acquire` and at most one `release` operation - can deal with each cell, and that `release` uses `getAndSet(PERMIT)` to perform transitions for performance reasons - so that the state `PERMIT` represents different logical states. - - +------+ `acquire` suspends +------+ `release` tries +--------+ // if `cont.tryResume(..)` succeeds, then - | NULL | -------------------> | cont | -------------------> | PERMIT | (cont RETRIEVED) // the corresponding `acquire` operation gets - +------+ +------+ to resume `cont` +--------+ // a permit and the `release` one completes. - | | - | | `acquire` request is cancelled and the continuation is - | `release` comes | replaced with a special `CANCEL` token to avoid memory leaks - | to the slot before V - | `acquire` and puts +-----------+ `release` has +--------+ - | a permit into the | CANCELLED | -----------------> | PERMIT | (RElEASE FAILED) - | slot, waiting for +-----------+ failed +--------+ - | `acquire` after - | that. - | - | `acquire` gets +-------+ - | +-----------------> | TAKEN | (ELIMINATION HAPPENED) - V | the permit +-------+ - +--------+ | - | PERMIT | -< - +--------+ | - | `release` has waited a bounded time, +--------+ - +---------------------------------------> | BROKEN | (BOTH RELEASE AND ACQUIRE FAILED) - but `acquire` has not come +--------+ - */ - - private val head: AtomicRef - private val deqIdx = atomic(0L) - private val tail: AtomicRef - private val enqIdx = atomic(0L) - +private class SemaphoreImpl(private val permits: Int, acquiredPermits: Int) : SegmentQueueSynchronizer(), Semaphore { init { require(permits > 0) { "Semaphore should have at least 1 permit, but had $permits" } require(acquiredPermits in 0..permits) { "The number of acquired permits should be in 0..$permits" } - val s = SemaphoreSegment(0, null, 2) - head = atomic(s) - tail = atomic(s) } /** @@ -148,16 +104,19 @@ private class SemaphoreImpl(private val permits: Int, acquiredPermits: Int) : Se private val _availablePermits = atomic(permits - acquiredPermits) override val availablePermits: Int get() = max(_availablePermits.value, 0) - override fun tryAcquire(): Boolean { - _availablePermits.loop { p -> - if (p <= 0) return false - if (_availablePermits.compareAndSet(p, p - 1)) return true - } + override fun tryAcquire(): Boolean = _availablePermits.loop { p -> + // Try to decrement the number of available + // permits if it is greater than zero. + if (p <= 0) return false + if (_availablePermits.compareAndSet(p, p - 1)) return true } override suspend fun acquire() { + // Decrement the number of available permits. val p = _availablePermits.getAndDecrement() - if (p > 0) return // permit acquired + // Is the permit acquired? + if (p > 0) return + // Try to suspend otherwise. // While it looks better when the following function is inlined, // it is important to make `suspend` function invocations in a way // so that the tail-call optimization can be applied. @@ -166,134 +125,36 @@ private class SemaphoreImpl(private val permits: Int, acquiredPermits: Int) : Se private suspend fun acquireSlowPath() = suspendAtomicCancellableCoroutineReusable sc@ { cont -> while (true) { - if (addAcquireToQueue(cont)) return@sc + // Try to suspend. + if (suspend(cont)) return@sc + // The suspension has been failed + // due to the synchronous resumption mode. + // Restart the whole acquire, and decrement + // the number of available permits at first. val p = _availablePermits.getAndDecrement() - if (p > 0) { // permit acquired + // Is the permit acquired? + if (p > 0) { cont.resume(Unit) return@sc } + // Otherwise, go to the beginning of + // the loop and try to suspend otherwise. } } override fun release() { while (true) { + // Increment the number of available permits + // if it does not exceed the maximum value. val p = _availablePermits.getAndUpdate { cur -> check(cur < permits) { "The number of released permits cannot be greater than $permits" } cur + 1 } + // Is there a waiter that should be resumed? if (p >= 0) return - if (tryResumeNextFromQueue()) return + // Try to resume the first waiter, and + // re-start the operation if it is cancelled. + if (resume(Unit)) return } } - - /** - * Returns `false` if the received permit cannot be used and the calling operation should restart. - */ - private fun addAcquireToQueue(cont: CancellableContinuation): Boolean { - val curTail = this.tail.value - val enqIdx = enqIdx.getAndIncrement() - val segment = this.tail.findSegmentAndMoveForward(id = enqIdx / SEGMENT_SIZE, startFrom = curTail, - createNewSegment = ::createSegment).segment // cannot be closed - val i = (enqIdx % SEGMENT_SIZE).toInt() - // the regular (fast) path -- if the cell is empty, try to install continuation - if (segment.cas(i, null, cont)) { // installed continuation successfully - cont.invokeOnCancellation(CancelSemaphoreAcquisitionHandler(segment, i).asHandler) - return true - } - // On CAS failure -- the cell must be either PERMIT or BROKEN - // If the cell already has PERMIT from tryResumeNextFromQueue, try to grab it - if (segment.cas(i, PERMIT, TAKEN)) { // took permit thus eliminating acquire/release pair - cont.resume(Unit) - return true - } - assert { segment.get(i) === BROKEN } // it must be broken in this case, no other way around it - return false // broken cell, need to retry on a different cell - } - - @Suppress("UNCHECKED_CAST") - private fun tryResumeNextFromQueue(): Boolean { - val curHead = this.head.value - val deqIdx = deqIdx.getAndIncrement() - val id = deqIdx / SEGMENT_SIZE - val segment = this.head.findSegmentAndMoveForward(id, startFrom = curHead, - createNewSegment = ::createSegment).segment // cannot be closed - segment.cleanPrev() - if (segment.id > id) return false - val i = (deqIdx % SEGMENT_SIZE).toInt() - val cellState = segment.getAndSet(i, PERMIT) // set PERMIT and retrieve the prev cell state - when { - cellState === null -> { - // Acquire has not touched this cell yet, wait until it comes for a bounded time - // The cell state can only transition from PERMIT to TAKEN by addAcquireToQueue - repeat(MAX_SPIN_CYCLES) { - if (segment.get(i) === TAKEN) return true - } - // Try to break the slot in order not to wait - return !segment.cas(i, PERMIT, BROKEN) - } - cellState === CANCELLED -> return false // the acquire was already cancelled - else -> return (cellState as CancellableContinuation).tryResume() - } - } -} - -private fun CancellableContinuation.tryResume(): Boolean { - val token = tryResume(Unit) ?: return false - completeResume(token) - return true -} - -private class CancelSemaphoreAcquisitionHandler( - private val segment: SemaphoreSegment, - private val index: Int -) : CancelHandler() { - override fun invoke(cause: Throwable?) { - segment.cancel(index) - } - - override fun toString() = "CancelSemaphoreAcquisitionHandler[$segment, $index]" -} - -private fun createSegment(id: Long, prev: SemaphoreSegment?) = SemaphoreSegment(id, prev, 0) - -private class SemaphoreSegment(id: Long, prev: SemaphoreSegment?, pointers: Int) : Segment(id, prev, pointers) { - val acquirers = atomicArrayOfNulls(SEGMENT_SIZE) - override val maxSlots: Int get() = SEGMENT_SIZE - - @Suppress("NOTHING_TO_INLINE") - inline fun get(index: Int): Any? = acquirers[index].value - - @Suppress("NOTHING_TO_INLINE") - inline fun set(index: Int, value: Any?) { - acquirers[index].value = value - } - - @Suppress("NOTHING_TO_INLINE") - inline fun cas(index: Int, expected: Any?, value: Any?): Boolean = acquirers[index].compareAndSet(expected, value) - - @Suppress("NOTHING_TO_INLINE") - inline fun getAndSet(index: Int, value: Any?) = acquirers[index].getAndSet(value) - - // Cleans the acquirer slot located by the specified index - // and removes this segment physically if all slots are cleaned. - fun cancel(index: Int) { - // Clean the slot - set(index, CANCELLED) - // Remove this segment if needed - onSlotCleaned() - } - - override fun toString() = "SemaphoreSegment[id=$id, hashCode=${hashCode()}]" -} -@SharedImmutable -private val MAX_SPIN_CYCLES = systemProp("kotlinx.coroutines.semaphore.maxSpinCycles", 100) -@SharedImmutable -private val PERMIT = Symbol("PERMIT") -@SharedImmutable -private val TAKEN = Symbol("TAKEN") -@SharedImmutable -private val BROKEN = Symbol("BROKEN") -@SharedImmutable -private val CANCELLED = Symbol("CANCELLED") -@SharedImmutable -private val SEGMENT_SIZE = systemProp("kotlinx.coroutines.semaphore.segmentSize", 16) +} \ No newline at end of file diff --git a/kotlinx-coroutines-core/jvm/test/linearizability/SegmentQueueSynchronizerLCStressTests.kt b/kotlinx-coroutines-core/jvm/test/linearizability/SegmentQueueSynchronizerLCStressTests.kt new file mode 100644 index 0000000000..e71ea6d22e --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/linearizability/SegmentQueueSynchronizerLCStressTests.kt @@ -0,0 +1,770 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +@file:Suppress("unused") +package kotlinx.coroutines.linearizability + +import kotlinx.atomicfu.* +import kotlinx.coroutines.* +import kotlinx.coroutines.internal.* +import kotlinx.coroutines.internal.SegmentQueueSynchronizer.CancellationMode.* +import kotlinx.coroutines.internal.SegmentQueueSynchronizer.ResumeMode.* +import kotlinx.coroutines.sync.* +import org.jetbrains.kotlinx.lincheck.annotations.Operation +import org.jetbrains.kotlinx.lincheck.verifier.* +import org.junit.* +import kotlin.coroutines.* +import kotlin.reflect.* + +// This test suit serves two purposes. First of all, it tests the `SegmentQueueSynchronizer` +// implementation under different use-cases and workloads. On the other side, this test suite +// provides different well-known synchronization and communication primitive implementations +// via `SegmentQueueSynchronizer`, which can be considered as an API richness check as well as +// a collection of examples on how to use `SegmentQueueSynchronizer` to build new primitives. + +// ############## +// # SEMAPHORES # +// ############## + +/** + * This [Semaphore] implementation is similar to the one in the library, + * but uses the [asynchronous][ASYNC] mode for resumptions. However, + * it is hard to make [tryAcquire] linearizable in this case, so that + * it is simply not supported here. + */ +internal class AsyncSemaphore(permits: Int) : SegmentQueueSynchronizer(), Semaphore { + override val resumeMode get() = ASYNC + + private val _availablePermits = atomic(permits) + override val availablePermits get() = _availablePermits.value.coerceAtLeast(0) + + override fun tryAcquire() = error("Not supported in the ASYNC version") + + override suspend fun acquire() { + // Decrement the number of available permits. + val p = _availablePermits.getAndDecrement() + // Is the permit successfully acquired? + if (p > 0) return + // Suspend otherwise. + suspendAtomicCancellableCoroutine { cont -> + check(suspend(cont)) { "Should not fail in ASYNC mode" } + } + } + + override fun release() { + while (true) { + // Increment the number of available permits. + val p = _availablePermits.getAndIncrement() + // Is there a waiter that should be resumed? + if (p >= 0) return + // Try to resume the first waiter, and + // re-start the operation if it is cancelled. + if (resume(Unit)) return + } + } +} + +/** + * This semaphore implementation is correct only if [release] is always + * invoked after a successful [acquire]; in other words, when semaphore + * is used properly, without unexpected [release] invocations. The main + * advantage is using smart cancellation, so that [release] always works + * in constant time under no contention, and the cancelled [acquire] + * requests do not play any role. It is worth noting, that it is possible + * to make this implementation correct under not atomic but strong cancellation + * model, when continuation can be cancelled if it is logically resumed + * but not dispatched yet. + */ +internal class AsyncSemaphoreSmart(permits: Int) : SegmentQueueSynchronizer(), Semaphore { + override val resumeMode get() = ASYNC + override val cancellationMode get() = SMART_SYNC + + private val _availablePermits = atomic(permits) + override val availablePermits get() = _availablePermits.value.coerceAtLeast(0) + + override fun tryAcquire() = error("Not supported in the ASYNC version") + + override suspend fun acquire() { + // Decrement the number of available permits. + val p = _availablePermits.getAndDecrement() + // Is the permit acquired? + if (p > 0) return + // Suspend otherwise. + suspendAtomicCancellableCoroutine { cont -> + check(suspend(cont)) { "Should not fail in ASYNC mode" } + } + } + + override fun release() { + // Increment the number of available permits. + val p = _availablePermits.getAndIncrement() + // Is there a waiter that should be resumed? + if (p >= 0) return + // Resume the first waiter. Due to the smart + // cancellation it is possible that this + // permit will be refused, so that the real + // release can come with a small lag, but it + // is guaranteed to be processed eventually. + resume(Unit) + } + + override fun onCancellation(): Boolean { + // Increment the number of available permits. + val p = _availablePermits.getAndIncrement() + // Return `true` if there is no `release` which + // is going to resume us and cannot skip us and + // resume the next waiter. + return p < 0 + } +} + +/** + * This implementation is similar to the previous one, but uses [synchronous][SYNC] + * resumption mode, so that it is possible to implement [tryAcquire] atomically. + * The only notable difference happens when a permit to be released is refused, + * and the following [resume] attempt in the cancellation handler fails due to + * the synchronization on resumption, so that the permit is going to be returned + * back to the semaphore in [returnValue] function. + */ +internal class SyncSemaphoreSmart(permits: Int) : SegmentQueueSynchronizer(), Semaphore { + override val resumeMode get() = SYNC + override val cancellationMode get() = SMART_SYNC + + private val _availablePermits = atomic(permits) + override val availablePermits get() = _availablePermits.value.coerceAtLeast(0) + + override suspend fun acquire() { + while (true) { + // Decrement the number of available permits. + val p = _availablePermits.getAndDecrement() + // Is the permit acquired? + if (p > 0) return + // Try to suspend otherwise. + val acquired = suspendAtomicCancellableCoroutine { cont -> + if (!suspend(cont)) cont.resume(false) + } + if (acquired) return + } + } + + override fun tryAcquire(): Boolean = _availablePermits.loop { cur -> + // Try to decrement the number of available + // permits if it is greater than zero. + if (cur <= 0) return false + if (_availablePermits.compareAndSet(cur, cur -1)) return true + } + + override fun release() { + while (true) { + // Increment the number of available permits. + val p = _availablePermits.getAndIncrement() + // Is there a waiter that should be resumed? + if (p >= 0) return + // Try to resume the first waiter, can fail + // according to the SYNC mode contract. + if (resume(true)) return + } + } + + override fun onCancellation(): Boolean { + // Increment the number of available permits. + val p = _availablePermits.getAndIncrement() + // Return `true` if there is no `release` which + // is going to resume us and cannot skip us and + // resume the next waiter. + return p < 0 + } + + override fun returnValue(value: Boolean) { + // Simply release the permit. + release() + } +} + +abstract class AsyncSemaphoreLCStressTestBase(semaphore: Semaphore, val seqSpec: KClass<*>) { + private val s = semaphore + + @Operation + suspend fun acquire() = s.acquire() + + @Operation + fun release() = s.release() + + @Test + fun test() = LCStressOptionsDefault() + .actorsBefore(0) + .sequentialSpecification(seqSpec.java) + .check(this::class) +} + +class SemaphoreUnboundedSequential1 : SemaphoreSequential(1, false) +class SemaphoreUnboundedSequential2 : SemaphoreSequential(2, false) + +class AsyncSemaphore1LCStressTest : AsyncSemaphoreLCStressTestBase(AsyncSemaphore(1), SemaphoreUnboundedSequential1::class) +class AsyncSemaphore2LCStressTest : AsyncSemaphoreLCStressTestBase(AsyncSemaphore(2), SemaphoreUnboundedSequential2::class) + +class AsyncSemaphoreSmart1LCStressTest : AsyncSemaphoreLCStressTestBase(AsyncSemaphoreSmart(1), SemaphoreUnboundedSequential1::class) +class AsyncSemaphoreSmart2LCStressTest : AsyncSemaphoreLCStressTestBase(AsyncSemaphoreSmart(2), SemaphoreUnboundedSequential2::class) + +class SyncSemaphoreSmart1LCStressTest : SemaphoreLCStressTestBase(SyncSemaphoreSmart(1), SemaphoreUnboundedSequential1::class) +class SyncSemaphoreSmart2LCStressTest : SemaphoreLCStressTestBase(SyncSemaphoreSmart(2), SemaphoreUnboundedSequential2::class) + + +// #################################### +// # COUNT-DOWN-LATCH SYNCHRONIZATION # +// #################################### + +/** + * This primitive allows to wait until several operation are completed. + * It is initialized with a given count, and each [countDown] invocation + * decrements the count of remaining operations to be completed. At the + * same time, [await] waits until the count reaches zero. + * + * This implementation uses simple cancellation, so that the [countDown] + * invocation that reaches the counter zero works in a linear of the number of [await] + * invocations, including the ones that are already cancelled. + */ +internal open class CountDownLatch(count: Int) : SegmentQueueSynchronizer() { + override val resumeMode get() = ASYNC + + private val count = atomic(count) + // The number of suspended `await` invocation. + // DONE_MARK should be set when the count reaches + // zero, so that the following suspension attempts + // can fail and complete immediately. + private val waiters = atomic(0) + + protected fun decWaiters() = waiters.decrementAndGet() + + /** + * Decrements the count and resumes waiting + * [await] invocations if it reaches zero. + */ + fun countDown() { + // Decrement the count. + val r = count.decrementAndGet() + // Should the waiters be resumed? + if (r <= 0) resumeWaiters() + } + + private fun resumeWaiters() { + val w = waiters.getAndUpdate { cur -> + // Is the done mark set? + if (cur and DONE_MARK != 0) return + cur or DONE_MARK + } + // This thread has successfully set + // the mark, resume the waiters. + repeat(w) { resume(Unit) } + } + + /** + * Waits until the count reaches zero, + * completes immediately if it is already zero. + */ + suspend fun await() { + // Check whether the count has been reached zero, + // this can be considered as an optimization. + if (remaining() == 0) return + // Increment the number of waiters and check + // that DONE_MARK is not set, finish otherwise. + val w = waiters.incrementAndGet() + if (w and DONE_MARK != 0) return + // The number of waiters is + // successfully incremented, suspend. + suspendAtomicCancellableCoroutine { suspend(it) } + } + + /** + * Return the current count. + */ + fun remaining(): Int = count.value.coerceAtLeast(0) + + protected companion object { + const val DONE_MARK = 1 shl 31 + } +} + +/** + * This implementation uses a smarter cancellation mechanism, so that the + * [countDown] invocation that reaches the counter zero works in linear of + * the number of non-cancelled [await] invocations. This way, it does not matter + * how many [await] requests has been cancelled -- they do not play any role. + */ +internal class CountDownLatchSmart(count: Int) : CountDownLatch(count) { + override val cancellationMode get() = SMART_ASYNC + + override fun onCancellation(): Boolean { + // Decrement the number of waiters. + val w = decWaiters() + // Succeed if the DONE_MARK is not set yet. + return (w and DONE_MARK) == 0 + } +} + +internal abstract class CountDownLatchLCStressTestBase(val cdl: CountDownLatch, val seqSpec: KClass<*>) { + @Operation + fun countDown() = cdl.countDown() + + @Operation + fun remaining() = cdl.remaining() + + @Operation + suspend fun await() = cdl.await() + + @Test + fun test() = LCStressOptionsDefault() + .actorsBefore(0) + .actorsAfter(0) + .sequentialSpecification(seqSpec.java) + .check(this::class) +} + +class CountDownLatchSequential1 : CountDownLatchSequential(1) +class CountDownLatchSequential2 : CountDownLatchSequential(2) + +internal class CountDownLatch1LCStressTest : CountDownLatchLCStressTestBase(CountDownLatch(1), CountDownLatchSequential1::class) +internal class CountDownLatch2LCStressTest : CountDownLatchLCStressTestBase(CountDownLatch(2), CountDownLatchSequential2::class) + +internal class CountDownLatchSmart1LCStressTest : CountDownLatchLCStressTestBase(CountDownLatchSmart(1), CountDownLatchSequential1::class) +internal class CountDownLatchSmart2LCStressTest : CountDownLatchLCStressTestBase(CountDownLatchSmart(2), CountDownLatchSequential2::class) + +open class CountDownLatchSequential(initialCount: Int) : VerifierState() { + private var count = initialCount + private val waiters = ArrayList>() + + fun countDown() { + if (--count == 0) { + waiters.forEach { it.tryResume0(Unit) } + waiters.clear() + } + } + + suspend fun await() { + if (count <= 0) return + suspendAtomicCancellableCoroutine { cont -> + waiters.add(cont) + } + } + + fun remaining(): Int = count.coerceAtLeast(0) + + override fun extractState() = remaining() +} + + +// ########################### +// # BARRIER SYNCHRONIZATION # +// ########################### + +/** + * This synchronization primitive allows a set of coroutines to + * all wait for each other to reach a common barrier point. + * + * The implementation is straightforward: it maintains a counter + * of arrived coroutines and increments it in the beginning of + * [arrived] operation. The last coroutines should resume all the + * previous ones. + * + * In case of cancellation, the handler decrements the counter if + * not all the parties are arrived. However, it is impossible to + * make cancellation atomic (e.g., Java's implementation simply + * does not work in case of thread interruption) since there is + * no way to resume a set of coroutines atomically. Thus, + * this implementation is non-atomic if cancellation happens + * simultaneously to the last [arrive], but is correct under + * the strong cancellation model, when continuation can be + * cancelled if it is logically resumed but not dispatched yet. + */ +internal class Barrier(private val parties: Int) : SegmentQueueSynchronizer() { + override val resumeMode get() = ASYNC + override val cancellationMode get() = SMART_ASYNC + + // The number of coroutines arrived to this barrier point. + private val arrived = atomic(0L) + + /** + * Waits for other parties and returns `true`. + * Fails if this invocation exceeds the number + * of parties, returns `false` in this case. + */ + suspend fun arrive(): Boolean { + // Increment the number of arrived parties. + val a = arrived.incrementAndGet() + return when { + // Should we suspend? + a < parties -> { + suspendCoroutine { cont -> suspend(cont) } + true + } + // Are we the last party? + a == parties.toLong() -> { + // Resume all waiters. + repeat(parties - 1) { + resume(Unit) + } + true + } + // Should we fail? + else -> false + } + } + + override fun onCancellation(): Boolean { + // Decrement the number of arrived parties if possible. + arrived.loop { cur -> + // Are we going to be resumed? + // The resumption permit should be refused in this case. + if (cur == parties.toLong()) return false + // Successful cancellation, return `true`. + if (arrived.compareAndSet(cur, cur - 1)) return true + } + } +} + +// TODO: non-atomic cancellation test, the corresponding feature in lincheck is required. +abstract class BarrierLCStressTestBase(parties: Int, val seqSpec: KClass<*>) { + private val b = Barrier(parties) + + @Operation(cancellableOnSuspension = false) + suspend fun arrive() = b.arrive() + + @Test + fun test() = LCStressOptionsDefault() + .actorsBefore(0) + .actorsAfter(0) + .threads(3) + .sequentialSpecification(seqSpec.java) + .check(this::class) +} + +class BarrierSequential1 : BarrierSequential(1) +class Barrier1LCStressTest : BarrierLCStressTestBase(1, BarrierSequential1::class) +class BarrierSequential2 : BarrierSequential(2) +class Barrier2LCStressTest : BarrierLCStressTestBase(2, BarrierSequential2::class) +class BarrierSequential3 : BarrierSequential(3) +class Barrier3LCStressTest : BarrierLCStressTestBase(3, BarrierSequential3::class) + +open class BarrierSequential(parties: Int) : VerifierState() { + private var remainig = parties + private val waiters = ArrayList>() + + suspend fun arrive(): Boolean { + val r = --remainig + return when { + r > 0 -> { + suspendAtomicCancellableCoroutine { cont -> + waiters.add(cont) + cont.invokeOnCancellation { + remainig++ + waiters.remove(cont) + } + } + true + } + r == 0 -> { + waiters.forEach { it.resume(Unit) } + true + } + else -> false + } + } + + override fun extractState() = remainig > 0 +} + + +// ################## +// # BLOCKING POOLS # +// ################## + +/** + * While using resources such as database connections, sockets, etc., + * it is typical to reuse them; that requires a fast and handy mechanism. + * This [BlockingPool] abstraction maintains a set of elements that can be put + * into the pool for further reuse or be retrieved to process the current operation. + * When [retrieve] comes to an empty pool, it blocks, and the following [put] operation + * resumes it; all the waiting requests are processed in the first-in-first-out (FIFO) order. + * + * In our tests we consider two pool implementations: the [queue-based][BlockingQueuePool] + * and the [stack-based][BlockingStackPool]. Intuitively, the queue-based implementation is + * faster since it is built on arrays and uses `Fetch-And-Add`-s on the contended path, + * while the stack-based pool retrieves the last inserted, thus the "hottest", elements. + * + * Please note that both these implementations are not atomic and can retrieve elements + * out-of-order under some races. However, since pools by themselves do not guarantee + * that the stored elements are ordered (the one may consider them as bags), + * these queue- and stack-based versions should be considered as pools with specific heuristics. + */ +interface BlockingPool { + /** + * Either resumes the first waiting [retrieve] operation + * and passes the [element] to it, or simply put the + * [element] into the pool. + */ + fun put(element: T) + + /** + * Retrieves one of the elements from the pool + * (the order is not specified), or suspends if it is + * empty -- the following [put] operations resume + * waiting [retrieve]-s in the first-in-first-out order. + */ + suspend fun retrieve(): T +} + +/** + * This pool uses queue under the hood and implemented withing the simple cancellation technique. + */ +internal class BlockingQueuePool : SegmentQueueSynchronizer(), BlockingPool { + override val resumeMode get() = ASYNC + + // > 0 -- number of elements; + // = 0 -- empty pool; + // < 0 -- number of waiters. + private val availableElements = atomic(0L) + + // This is an infinite array by design, a plain array is used for simplicity. + private val elements = atomicArrayOfNulls(100) + + // Indices in [elements] for the next [tryInsert] and [tryRetrieve] operations. + // Each [tryInsert]/[tryRetrieve] pair works on a separate slot. When [tryRetrieve] + // comes earlier, it marks the slot as [BROKEN] so that both this operation and the + // corresponding [tryInsert] fail. + private val insertIdx = atomic(0) + private val retrieveIdx = atomic(0) + + override fun put(element: T) { + while (true) { + // Increment the number of elements in advance. + val b = availableElements.getAndIncrement() + // Is there a waiting `retrieve`? + if (b < 0) { + // Try to resume the first waiter, + // can fail if it is already cancelled. + if (resume(element)) return + } else { + // Try to insert the element into the + // queue, can fail if the slot is broken. + if (tryInsert(element)) return + } + } + } + + /** + * Tries to insert the [element] into the next + * [elements] array slot. Returns `true` if + * succeeds, or `false` if the slot is [broken][BROKEN]. + */ + private fun tryInsert(element: T): Boolean { + val i = insertIdx.getAndIncrement() + return elements[i].compareAndSet(null, element) + } + + override suspend fun retrieve(): T { + while (true) { + // Decrements the number of elements. + val b = availableElements.getAndDecrement() + // Is there an element in the pool? + if (b > 0) { + // Try to retrieve the first element, + // can fail if the first [elements] slot + // is empty due to a race. + val x = tryRetrieve() + if (x != null) return x + } else { + // The pool is empty, suspend. + return suspendAtomicCancellableCoroutine { cont -> + suspend(cont) + } + } + } + } + + /** + * Tries to retrieve the first element from + * the [elements] array. Returns the element if + * succeeds, or `null` if the first slot is empty + * due to a race -- it marks the slot as [broken][BROKEN] + * in this case, so that the corresponding [tryInsert] + * invocation fails. + */ + private fun tryRetrieve(): T? { + val i = retrieveIdx.getAndIncrement() + return elements[i].getAndSet(BROKEN) as T? + } + + companion object { + @JvmStatic + val BROKEN = Symbol("BROKEN") + } +} + +/** + * This pool uses stack under the hood and shows how to use smart cancellation + * for such data structures with resources. + */ +internal class BlockingStackPool : SegmentQueueSynchronizer(), BlockingPool { + override val resumeMode get() = ASYNC + override val cancellationMode get() = SMART_SYNC + + // The stack is implemented via a concurrent linked list, + // this is its head; `null` means that the stack is empty. + private val head = atomic?>(null) + + // > 0 -- number of elements; + // = 0 -- empty pool; + // < 0 -- number of waiters. + private val availableElements = atomic(0) + + override fun put(element: T) { + while (true) { + // Increment the number of elements in advance. + val b = availableElements.getAndIncrement() + // Is there a waiting retrieve? + if (b < 0) { + // Resume the first waiter, never fails + // in the smart cancellation mode. + resume(element) + return + } else { + // Try to insert the element into the + // stack, can fail if a concurrent [tryRetrieve] + // came earlier and marked it with a failure node. + if (tryInsert(element)) return + } + } + } + + /** + * Tries to insert the [element] into the stack. + * Returns `true` on success`, or `false` if the + * stack is marked with a failure node, retrieving + * it in this case. + */ + private fun tryInsert(element: T): Boolean = head.loop { h -> + // Is the stack marked with a failure node? + if (h != null && h.element == null) { + // Try to retrieve the failure node. + if (head.compareAndSet(h, h.next)) return false + } else { + // Try to insert the element. + val newHead = StackNode(element, h) + if (head.compareAndSet(h, newHead)) return true + } + } + + override suspend fun retrieve(): T { + while (true) { + // Decrement the number of elements. + val b = availableElements.getAndDecrement() + // Is there an element in the pool? + if (b > 0) { + // Try to retrieve the top element, + // can fail if the stack if empty + // due to a race. + val x = tryRetrieve() + if (x != null) return x + } else { + // The pool is empty, suspend. + return suspendAtomicCancellableCoroutine { cont -> + suspend(cont) + } + } + } + } + + /** + * Try to retrieve the top (last) element and return `true` + * if the stack is not empty, or return `false` and + * insert a failure node otherwise. + */ + private fun tryRetrieve(): T? = head.loop { h -> + // Is the queue empty or full of failure nodes? + if (h == null || h.element == null) { + // Try to add one more failure node and fail. + val failNode = StackNode(null, h) + if (head.compareAndSet(h, failNode)) return null + } else { + // Try to retrieve the top element. + if (head.compareAndSet(h, h.next)) return h.element + } + } + + // The logic of cancellation is very similar to the one + // in semaphore, with the only difference that elements + // should be physically returned to the pool. + override fun onCancellation(): Boolean { + val b = availableElements.getAndIncrement() + return b < 0 + } + + // If an element is refused, it should be inserted back to the stack. + override fun tryReturnRefusedValue(value: T) = tryInsert(value) + + // In order to return the value back + // to the pool [put] is naturally used. + override fun returnValue(value: T) = put(value) + + class StackNode(val element: T?, val next: StackNode?) +} + +abstract class BlockingPoolLCStressTestBase(val p: BlockingPool) { + @Operation + fun put() = p.put(Unit) + + @Operation + suspend fun retrieve() = p.retrieve() + + @Test + fun test() = LCStressOptionsDefault() + .sequentialSpecification(BlockingPoolUnitSequential::class.java) + .check(this::class) +} +class BlockingQueuePoolLCStressTest : BlockingPoolLCStressTestBase(BlockingQueuePool()) +class BlockingStackPoolLCStressTest : BlockingPoolLCStressTestBase(BlockingStackPool()) + +class BlockingPoolUnitSequential : VerifierState() { + private var elements = 0 + private val waiters = ArrayList>() + + fun put() { + while (true) { + if (waiters.isNotEmpty()) { + val w = waiters.removeAt(0) + if (w.tryResume0(Unit)) return + } else { + elements ++ + return + } + } + } + + suspend fun retrieve() { + if (elements > 0) { + elements-- + } else { + suspendAtomicCancellableCoroutine { cont -> + waiters.add(cont) + } + } + } + + override fun extractState() = elements +} + + +// ############# +// # UTILITIES # +// ############# + +/** + * Tries to resume this continuation atomically, + * returns `true` if succeeds and `false` otherwise. + */ +private fun CancellableContinuation.tryResume0(value: T): Boolean { + val token = tryResume(value) ?: return false + completeResume(token) + return true +} \ No newline at end of file diff --git a/kotlinx-coroutines-core/jvm/test/linearizability/SemaphoreLCStressTest.kt b/kotlinx-coroutines-core/jvm/test/linearizability/SemaphoreLCStressTest.kt index 52902f4987..51b4cf7863 100644 --- a/kotlinx-coroutines-core/jvm/test/linearizability/SemaphoreLCStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/linearizability/SemaphoreLCStressTest.kt @@ -6,29 +6,70 @@ package kotlinx.coroutines.linearizability import kotlinx.coroutines.* import kotlinx.coroutines.sync.* +import org.jetbrains.kotlinx.lincheck.* import org.jetbrains.kotlinx.lincheck.annotations.Operation import org.jetbrains.kotlinx.lincheck.verifier.* import org.junit.* +import kotlin.reflect.* -abstract class SemaphoreLCStressTestBase(permits: Int) : VerifierState() { - private val semaphore = Semaphore(permits) +abstract class SemaphoreLCStressTestBase(semaphore: Semaphore, val seqSpec: KClass<*>) { + private val s = semaphore @Operation - fun tryAcquire() = semaphore.tryAcquire() + fun tryAcquire() = s.tryAcquire() @Operation - suspend fun acquire() = semaphore.acquire() + suspend fun acquire() = s.acquire() @Operation(handleExceptionsAsResult = [IllegalStateException::class]) - fun release() = semaphore.release() + fun release() = s.release() @Test fun test() = LCStressOptionsDefault() .actorsBefore(0) + .sequentialSpecification(seqSpec.java) .check(this::class) +} + +open class SemaphoreSequential(val permits: Int, val boundMaxPermits: Boolean) : VerifierState() { + private var availablePermits = permits + private val waiters = ArrayList>() + + fun tryAcquire(): Boolean { + if (availablePermits <= 0) return false + availablePermits-- + return true + } + + suspend fun acquire() { + if (tryAcquire()) return + availablePermits-- + suspendAtomicCancellableCoroutine { cont -> + waiters.add(cont) + } + } - override fun extractState() = semaphore.availablePermits + fun release() { + while (true) { + check(availablePermits < permits || !boundMaxPermits) + availablePermits++ + if (availablePermits > 0) return + val w = waiters.removeAt(0) + if (w.tryResume0(Unit)) return + } + } + + override fun extractState() = availablePermits.coerceAtLeast(0) } -class Semaphore1LCStressTest : SemaphoreLCStressTestBase(1) -class Semaphore2LCStressTest : SemaphoreLCStressTestBase(2) \ No newline at end of file +class SemaphoreSequential1 : SemaphoreSequential(1, true) +class Semaphore1LCStressTest : SemaphoreLCStressTestBase(Semaphore(1), SemaphoreSequential1::class) + +class SemaphoreSequential2 : SemaphoreSequential(2, true) +class Semaphore2LCStressTest : SemaphoreLCStressTestBase(Semaphore(2), SemaphoreSequential2::class) + +private fun CancellableContinuation.tryResume0(value: T): Boolean { + val token = tryResume(value) ?: return false + completeResume(token) + return true +} \ No newline at end of file