Skip to content

CancellableQueueSynchronizer and ReadWriteMutex #2045

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: develop
Choose a base branch
from
Prev Previous commit
Next Next commit
Documentation + mutex fix attempt
  • Loading branch information
ndkoval committed Mar 4, 2023
commit e1d86bd6aa35096250ea8577875934a0d639613b
8 changes: 4 additions & 4 deletions kotlinx-coroutines-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,8 @@ task jvmStressTest(type: Test, dependsOn: compileTestKotlinJvm) {
testLogging.showStandardStreams = true
systemProperty 'kotlinx.coroutines.scheduler.keep.alive.sec', '100000' // any unpark problem hangs test
// Adjust internal algorithmic parameters to increase the testing quality instead of performance.
systemProperty 'kotlinx.coroutines.sqs.segmentSize', '1'
systemProperty 'kotlinx.coroutines.sqs.maxSpinCycles', '10'
systemProperty 'kotlinx.coroutines.cqs.segmentSize', '1'
systemProperty 'kotlinx.coroutines.cqs.maxSpinCycles', '10'
systemProperty 'kotlinx.coroutines.bufferedChannel.segmentSize', '2'
systemProperty 'kotlinx.coroutines.bufferedChannel.expandBufferCompletionWaitIterations', '1'
}
Expand Down Expand Up @@ -302,8 +302,8 @@ static void configureJvmForLincheck(task, additional = false) {
'--add-exports', 'java.base/jdk.internal.util=ALL-UNNAMED'] // in the model checking mode
// Adjust internal algorithmic parameters to increase the testing quality instead of performance.
var segmentSize = additional ? '2' : '1'
task.systemProperty 'kotlinx.coroutines.sqs.segmentSize', segmentSize
task.systemProperty 'kotlinx.coroutines.sqs.maxSpinCycles', '1' // better for the model checking mode
task.systemProperty 'kotlinx.coroutines.cqs.segmentSize', segmentSize
task.systemProperty 'kotlinx.coroutines.cqs.maxSpinCycles', '1' // better for the model checking mode
task.systemProperty 'kotlinx.coroutines.bufferedChannel.segmentSize', segmentSize
task.systemProperty 'kotlinx.coroutines.bufferedChannel.expandBufferCompletionWaitIterations', '1'
}
Expand Down

Large diffs are not rendered by default.

52 changes: 32 additions & 20 deletions kotlinx-coroutines-core/common/src/sync/Mutex.kt
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public suspend inline fun <T> Mutex.withLock(owner: Any? = null, action: () -> T
}


internal open class MutexImpl(locked: Boolean) : SegmentQueueSynchronizer<Unit>(), Mutex {
internal open class MutexImpl(locked: Boolean) : CancellableQueueSynchronizer<Unit>(), Mutex {
/**
* After the lock is acquired, the corresponding owner is stored in this field.
* The [unlock] operation checks the owner and either re-sets it to [NO_OWNER],
Expand All @@ -148,8 +148,15 @@ internal open class MutexImpl(locked: Boolean) : SegmentQueueSynchronizer<Unit>(
{ unlock(owner) }
}

override val isLocked: Boolean get() =
availablePermits.value <= 0
override val isLocked: Boolean get() {
while (true) {
val p = availablePermits.value
if (p == 1) return false
assert { p <= 0 }
if (owner.value === NO_OWNER) continue
return true
}
}

override fun holdsLock(owner: Any): Boolean {
while (true) {
Expand Down Expand Up @@ -186,10 +193,21 @@ internal open class MutexImpl(locked: Boolean) : SegmentQueueSynchronizer<Unit>(
// locked by our owner.
if (owner != null) {
// Is this mutex locked by our owner?
var curOwner = this.owner.value

val curOwner = this.owner.value
if (curOwner === owner) {
if (suspendCancelled() != null) release()
if (suspendCancelled() != null) {
when (waiter) {
is CancellableContinuation<*> -> {
@Suppress("UNCHECKED_CAST")
waiter as CancellableContinuation<Unit>
waiter.resume(Unit, null)
}
is SelectInstance<*> -> {
waiter.selectInRegistrationPhase(Unit)
}
}
return
}
when (waiter) {
is CancellableContinuation<*> -> {
waiter.resumeWithException(IllegalStateException("ERROR"))
Expand All @@ -199,32 +217,26 @@ internal open class MutexImpl(locked: Boolean) : SegmentQueueSynchronizer<Unit>(
}
}
return
}

while (curOwner === NO_OWNER) {
curOwner = this.owner.value
if (!isLocked) {
if (suspendCancelled() != null) release()
continue@xxx
}
}
if (curOwner === owner) {
if (suspendCancelled() != null) release()
} else if (curOwner === NO_OWNER) {
if (suspendCancelled() == null) continue@xxx
when (waiter) {
is CancellableContinuation<*> -> {
waiter.resumeWithException(IllegalStateException("ERROR"))
@Suppress("UNCHECKED_CAST")
waiter as CancellableContinuation<Unit>
waiter.resume(Unit, null)
}
is SelectInstance<*> -> {
waiter.selectInRegistrationPhase(ON_LOCK_ALREADY_LOCKED_BY_OWNER)
waiter.selectInRegistrationPhase(Unit)
}
}
return
}

// This mutex is either locked by another owner or unlocked.
// In the latter case, it is possible that it WAS locked by
// our owner when the semaphore permit acquisition has failed.
// To preserve linearizability, the operation restarts in this case.
// if (!isLocked) continuex
// if (!isLocked) continue
}
if (suspend(waiter)) return
} else {
Expand Down
60 changes: 30 additions & 30 deletions kotlinx-coroutines-core/common/src/sync/ReadWriteMutex.kt
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2023 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.sync

import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlinx.coroutines.internal.*
import kotlinx.coroutines.internal.SegmentQueueSynchronizer.CancellationMode.*
import kotlinx.coroutines.internal.SegmentQueueSynchronizer.ResumeMode.*
import kotlinx.coroutines.internal.CancellableQueueSynchronizer.CancellationMode.*
import kotlinx.coroutines.internal.CancellableQueueSynchronizer.ResumeMode.*
import kotlinx.coroutines.selects.*
import kotlinx.coroutines.sync.ReadWriteMutexImpl.WriteUnlockPolicy.*
import kotlin.contracts.*
Expand Down Expand Up @@ -158,7 +158,7 @@ public suspend inline fun <T> ReadWriteMutex.write(action: () -> T): T =
* for the lock. This tuple represents the current state of the readers-writer mutex and
* is split into [waitingReaders] and [state] fields -- it is impossible to store everything
* in a single register since its maximal capacity is 64 bit, and this is not sufficient
* for three counters and several flags. Additionally, separate [SegmentQueueSynchronizer]-s
* for three counters and several flags. Additionally, separate [CancellableQueueSynchronizer]-s
* are used for waiting readers and writers.
*
* To acquire a reader lock, the algorithm checks whether the writer lock is held or there is a writer
Expand Down Expand Up @@ -190,21 +190,21 @@ public suspend inline fun <T> ReadWriteMutex.write(action: () -> T): T =
* comments and appear almost everywhere.
*/
internal class ReadWriteMutexImpl : ReadWriteMutex, Mutex {
// The number of coroutines waiting for a reader lock in `sqsReaders`.
// The number of coroutines waiting for a reader lock in `cqsReaders`.
private val waitingReaders = atomic(0)
// This state field contains several counters and is always updated atomically by `CAS`:
// - `AR` (active readers) is a 30-bit counter which represents the number
// of coroutines holding a read lock;
// - `WLA` (writer lock acquired) is a flag which is `true` when
// the writer lock is acquired;
// - `WW` (waiting writers) is a 30-bit counter which represents the number
// of coroutines waiting for the writer lock in `sqsWriters`;
// of coroutines waiting for the writer lock in `cqsWriters`;
// - `RWR` (resuming waiting readers) is a flag which is `true` when waiting readers
// resumption is in progress.
private val state = atomic(0L)

private val sqsReaders = ReadersSQS() // the place where readers should suspend and be resumed
private val sqsWriters = WritersSQS() // the place where writers should suspend and be resumed
private val cqsReaders = ReadersCQS() // the place where readers should suspend and be resumed
private val cqsWriters = WritersCQS() // the place where writers should suspend and be resumed

private var curUnlockPolicy = false // false -- prioritize readers on the writer lock release
// true -- prioritize writers on the writer lock release
Expand Down Expand Up @@ -265,9 +265,9 @@ internal class ReadWriteMutexImpl : ReadWriteMutex, Mutex {
// Is there a writer holding the lock or waiting for it?
if (s.wla || s.ww > 0) {
// The number of waiting readers was incremented
// correctly, wait for a reader lock in `sqsReaders`.
// correctly, wait for a reader lock in `cqsReaders`.
suspendCancellableCoroutineReusable<Unit> { cont ->
sqsReaders.suspend(cont as Waiter)
cqsReaders.suspend(cont as Waiter)
}
return
} else {
Expand All @@ -276,18 +276,18 @@ internal class ReadWriteMutexImpl : ReadWriteMutex, Mutex {
// it could already become zero due to a concurrent `write.unlock()`
// which reads the number of waiting readers, replaces it with `0`,
// and resumes all these readers. In this case, it is guaranteed
// that a reader lock will be provided via `sqsReaders`.
// that a reader lock will be provided via `cqsReaders`.
while (true) {
// Read the current number of waiting readers.
val wr = waitingReaders.value
// Is our invocation already handled by a concurrent
// `write.unlock()` and a reader lock is going to be
// passed via `sqsReaders`? Suspend in this case --
// passed via `cqsReaders`? Suspend in this case --
// it is guaranteed that the lock will be provided
// when this concurrent `write.unlock()` completes.
if (wr == 0) {
suspendCancellableCoroutineReusable<Unit> { cont ->
sqsReaders.suspend(cont as Waiter)
cqsReaders.suspend(cont as Waiter)
}
return
}
Expand Down Expand Up @@ -320,7 +320,7 @@ internal class ReadWriteMutexImpl : ReadWriteMutex, Mutex {
// Try to decrement the number of waiting writers and set the `WLA` flag.
// Resume the first waiting writer on success.
if (state.compareAndSet(s, state(0, true, s.ww - 1, false))) {
sqsWriters.resume(Unit)
cqsWriters.resume(Unit)
return
}
} else {
Expand All @@ -342,12 +342,12 @@ internal class ReadWriteMutexImpl : ReadWriteMutex, Mutex {
}

/**
* This customization of [SegmentQueueSynchronizer] for waiting readers
* This customization of [CancellableQueueSynchronizer] for waiting readers
* use the asynchronous resumption mode and smart cancellation mode,
* so neither [suspend] nor [resume] fail. However, to support
* `tryReadLock()` the synchronous resumption mode should be used.
*/
private inner class ReadersSQS : SegmentQueueSynchronizer<Unit>() {
private inner class ReadersCQS : CancellableQueueSynchronizer<Unit>() {
override val resumeMode get() = ASYNC
override val cancellationMode get() = SMART

Expand Down Expand Up @@ -384,7 +384,7 @@ internal class ReadWriteMutexImpl : ReadWriteMutex, Mutex {
// tries to change the state by atomically setting the `WLA` flag.
// Otherwise, if the writer lock cannot be acquired immediatelly,
// it increments the number of waiting writers and suspends in
// `sqsWriters` waiting for the lock.
// `cqsWriters` waiting for the lock.
while (true) {
// Read the current state.
val s = state.value
Expand All @@ -397,10 +397,10 @@ internal class ReadWriteMutexImpl : ReadWriteMutex, Mutex {
return
} else {
// The lock cannot be acquired immediately, and this operation has to suspend.
// Try to increment the number of waiting writers and suspend in `sqsWriters`.
// Try to increment the number of waiting writers and suspend in `cqsWriters`.
if (state.compareAndSet(s, state(s.ar, s.wla, s.ww + 1, s.rwr))) {
suspendCancellableCoroutineReusable<Unit> { cont ->
sqsWriters.suspend(cont as Waiter)
cqsWriters.suspend(cont as Waiter)
}
return
}
Expand Down Expand Up @@ -428,7 +428,7 @@ internal class ReadWriteMutexImpl : ReadWriteMutex, Mutex {
// but has a lot of corner cases that should be properly managed.
// If the next writer should be resumed (see `PRIORITIZE_WRITERS` policy),
// the algorithm tries to atomically decrement the number of waiting writers
// and keep the `WLA` flag, resuming the first writer in `sqsWriters` after that.
// and keep the `WLA` flag, resuming the first writer in `cqsWriters` after that.
// Otherwise, if the `PRIORITIZE_READERS` policy is used or there is no waiting writer,
// the algorithm sets the `RWR` (resuming waiting readers) flag and invokes a special
// `completeWaitingReadersResumption()` to resume all the waiting readers.
Expand All @@ -443,9 +443,9 @@ internal class ReadWriteMutexImpl : ReadWriteMutex, Mutex {
val resumeWriter = (s.ww > 0) && (policy == PRIORITIZE_WRITERS || policy == ROUND_ROBIN && curUnlockPolicy)
if (resumeWriter) {
// Resume the next writer - try to decrement the number of waiting
// writers and resume the first one in `sqsWriters` on success.
// writers and resume the first one in `cqsWriters` on success.
if (state.compareAndSet(s, state(0, true, s.ww - 1, false))) {
sqsWriters.resume(Unit)
cqsWriters.resume(Unit)
return
}
} else {
Expand Down Expand Up @@ -483,9 +483,9 @@ internal class ReadWriteMutexImpl : ReadWriteMutex, Mutex {
state(s.ar + wr, false, s.ww, true)
}
}
// After the readers are resumed logically, they should be resumed physically in `sqsReaders`.
// After the readers are resumed logically, they should be resumed physically in `cqsReaders`.
repeat(wr) {
sqsReaders.resume(Unit)
cqsReaders.resume(Unit)
}
// Once all the waiting readers are resumed, the `RWR` flag should be reset.
// It is possible that all the resumed readers have already completed their
Expand All @@ -502,7 +502,7 @@ internal class ReadWriteMutexImpl : ReadWriteMutex, Mutex {
}
if (resumeWriter) {
// Resume the next writer physically and finish
sqsWriters.resume(Unit)
cqsWriters.resume(Unit)
return
}
// Meanwhile, it could be possible for a writer to come and suspend due to the `RWR` flag.
Expand All @@ -524,25 +524,25 @@ internal class ReadWriteMutexImpl : ReadWriteMutex, Mutex {
}

/**
* This customization of [SegmentQueueSynchronizer] for waiting writers
* This customization of [CancellableQueueSynchronizer] for waiting writers
* uses the asynchronous resumption mode and smart cancellation mode,
* so neither [suspend] nor [resume] fail. However, in order to support
* `tryWriteLock()` the synchronous resumption mode should be used instead.
*/
private inner class WritersSQS : SegmentQueueSynchronizer<Unit>() {
private inner class WritersCQS : CancellableQueueSynchronizer<Unit>() {
override val resumeMode get() = ASYNC
override val cancellationMode get() = SMART

override fun onCancellation(): Boolean {
// In general, on cancellation, the algorithm tries to decrement the number of waiting writers.
// Similarly to the cancellation logic for readers, if the number of waiting writers has already reached 0,
// the current canceling writer will be resumed in `sqsWriters`. In this case, the function returns
// the current canceling writer will be resumed in `cqsWriters`. In this case, the function returns
// `false`, and the permit will be returned via `returnValue()`. Otherwise, if the number of waiting
// writers >= 1, the decrement is sufficient. However, if this canceling writer is the last waiting one,
// the algorithm sets the `RWR` flag and resumes waiting readers. This logic is similar to `writeUnlock(..)`.
while (true) {
val s = state.value // Read the current state.
if (s.ww == 0) return false // Is this writer going to be resumed in `sqsWriters`?
if (s.ww == 0) return false // Is this writer going to be resumed in `cqsWriters`?
// Is this writer the last one and is the readers resumption valid?
if (s.ww == 1 && !s.wla && !s.rwr) {
// Set the `RWR` flag and resume the waiting readers.
Expand Down Expand Up @@ -579,7 +579,7 @@ internal class ReadWriteMutexImpl : ReadWriteMutex, Mutex {
"<wr=${waitingReaders.value},ar=${state.value.ar}" +
",wla=${state.value.wla},ww=${state.value.ww}" +
",rwr=${state.value.rwr}" +
",sqs_r={$sqsReaders},sqs_w={$sqsWriters}>"
",cqs_r={$cqsReaders},cqs_w={$cqsWriters}>"

internal enum class WriteUnlockPolicy { PRIORITIZE_READERS, PRIORITIZE_WRITERS, ROUND_ROBIN }
}
Expand Down
3 changes: 1 addition & 2 deletions kotlinx-coroutines-core/common/src/sync/Semaphore.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlinx.coroutines.internal.*
import kotlin.contracts.*
import kotlin.coroutines.*
import kotlin.math.*

/**
Expand Down Expand Up @@ -107,7 +106,7 @@ public suspend inline fun <T> Semaphore.withPermit(action: () -> T): T {
internal open class SemaphoreImpl(
private val permits: Int,
acquiredPermits: Int
) : SegmentQueueSynchronizer<Unit>(), Semaphore {
) : CancellableQueueSynchronizer<Unit>(), Semaphore {
init {
require(permits > 0) { "Semaphore must have at least 1 permit, but is initialized with $permits" }
require(acquiredPermits in 0..permits) { "The number of acquired permits should be in range [0..$permits]" }
Expand Down
Loading