Skip to content

Commit

Permalink
Reduce the memory consumption slightly
Browse files Browse the repository at this point in the history
In exchange, now, removal is linear in the size of number of
registered handlers instead of being amortized constant.
  • Loading branch information
dkhalanskyjb committed Apr 11, 2024
1 parent 9b9148c commit 03a42b9
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 74 deletions.
55 changes: 36 additions & 19 deletions kotlinx-coroutines-core/common/src/JobSupport.kt
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren

private fun notifyHandlers(list: NodeList, permissionBitmask: Byte, cause: Throwable?, predicate: (JobNode) -> Boolean) {
var exception: Throwable? = null
list.forEach(forbidBitmask = permissionBitmask) { node ->
list.forEach(forbidBitmask = permissionBitmask) { node, _, _ ->
if (node is JobNode && predicate(node)) {
try {
node.invoke(cause)
Expand Down Expand Up @@ -558,7 +558,10 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren

private fun promoteSingleToNodeList(state: JobNode) {
// try to promote it to list (SINGLE+ state)
_state.compareAndSet(state, state.attachToList(NodeList()))
val list = NodeList()
val address = list.addLastWithoutModifying(state, permissionsBitmask = 0)
assert { address == 0L }
_state.compareAndSet(state, list)
}

public final override suspend fun join() {
Expand Down Expand Up @@ -621,7 +624,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
}
is Incomplete -> { // may have a list of completion handlers
// remove node from the list if there is a list
if (state.list != null) node.remove()
state.list?.remove(node)
return
}
else -> return // it is complete and does not have any completion handlers
Expand Down Expand Up @@ -932,39 +935,52 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
private val Any?.exceptionOrNull: Throwable?
get() = (this as? CompletedExceptionally)?.cause

private fun shouldWaitForChildren(state: Finishing, proposedUpdate: Any?, suggestedStart: ChildHandleNode? = null): Boolean {
private fun shouldWaitForChildren(
state: Finishing,
proposedUpdate: Any?,
suggestedStartSegment: LockFreeLinkedListSegment? = null,
suggestedStartIndex: Int? = null
): Boolean {
val list = state.list
fun tryFindChildren(suggestedStart: ChildHandleNode?, closeList: Boolean): Boolean {
var startAfter: ChildHandleNode? = suggestedStart
fun tryFindChildren(
closeList: Boolean,
suggestedStartSegment: LockFreeLinkedListSegment? = null,
suggestedStartIndex: Int? = null,
): Boolean {
var startSegment = suggestedStartSegment
var startIndex = suggestedStartIndex
while (true) {
val child = run {
list.forEach(forbidBitmask = if (closeList) LIST_CHILD_PERMISSION else 0, startAfter = startAfter) {
if (it is ChildHandleNode) return@run it
list.forEach(forbidBitmask = if (closeList) LIST_CHILD_PERMISSION else 0, startInSegment = startSegment, startAfterIndex = startIndex) { node, segment, indexInSegment ->
if (node is ChildHandleNode) {
startSegment = segment
startIndex = indexInSegment
return@run node
}
}
null
} ?: break
val handle = child.childJob.invokeOnCompletion(
invokeImmediately = false,
handler = ChildCompletion(this, state, child, proposedUpdate)
handler = ChildCompletion(this, state, startSegment!!, startIndex!!, proposedUpdate)
)
if (handle !== NonDisposableHandle) return true // child is not complete and we've started waiting for it
startAfter = child
}
return false
}
// Look for children that are currently in the list after the suggested start node.
if (tryFindChildren(suggestedStart = suggestedStart, closeList = false)) return true
if (tryFindChildren(suggestedStartSegment = suggestedStartSegment, suggestedStartIndex = suggestedStartIndex, closeList = false)) return true
// We didn't find anyone in the list after the suggested start node. Let's check the beginning now.
if (suggestedStart != null && tryFindChildren(suggestedStart = null, closeList = false)) return true
if (suggestedStartSegment != null && tryFindChildren(closeList = false)) return true
// Now we know that, at the moment this function started, there were no more children.
// We can close the list for the new children, and if we still don't find any, we can be sure there are none.
return tryFindChildren(suggestedStart = null, closeList = true)
return tryFindChildren(closeList = true)
}

// ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method.
private fun continueCompleting(state: Finishing, lastChild: ChildHandleNode, proposedUpdate: Any?) {
private fun continueCompleting(state: Finishing, proposedUpdate: Any?, lastSegment: LockFreeLinkedListSegment, lastIndexInSegment: Int) {
assert { this.state === state } // consistency check -- it cannot change while we are waiting for children
if (shouldWaitForChildren(state, proposedUpdate, suggestedStart = lastChild)) return // waiting for the next child
if (shouldWaitForChildren(state, proposedUpdate, suggestedStartSegment = lastSegment, suggestedStartIndex = lastIndexInSegment)) return // waiting for the next child
// no more children, now we are sure; try to update the state
val finalState = finalizeFinishingState(state, proposedUpdate)
afterCompletion(finalState)
Expand All @@ -974,7 +990,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
when (val state = this@JobSupport.state) {
is ChildHandleNode -> yield(state.childJob)
is Incomplete -> state.list?.let { list ->
list.forEach { if (it is ChildHandleNode) yield(it.childJob) }
list.forEach { it, _, _ -> if (it is ChildHandleNode) yield(it.childJob) }
}
}
}
Expand Down Expand Up @@ -1232,11 +1248,12 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
private class ChildCompletion(
private val parent: JobSupport,
private val state: Finishing,
private val child: ChildHandleNode,
private val segment: LockFreeLinkedListSegment,
private val indexInSegment: Int,
private val proposedUpdate: Any?
) : JobNode() {
override fun invoke(cause: Throwable?) {
parent.continueCompleting(state, child, proposedUpdate)
parent.continueCompleting(state, proposedUpdate, lastSegment = segment, lastIndexInSegment = indexInSegment)
}
override val onCancelling: Boolean get() = false
}
Expand Down Expand Up @@ -1477,7 +1494,7 @@ internal class NodeList : LockFreeLinkedListHead(), Incomplete {
append(state)
append("}[")
var first = true
this@NodeList.forEach { node ->
this@NodeList.forEach { node, _, _ ->
if (node is JobNode) {
if (first) first = false else append(", ")
append(node)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,38 +11,10 @@ import kotlin.jvm.*
/** @suppress **This is unstable API and it is subject to change.** */
internal open class LockFreeLinkedListNode {
/**
* Try putting this node into a list.
*
* Returns:
* - The new head of the list if the operation succeeded.
* - The head of the list if someone else concurrently added this node to the list,
* but no other modifications to the list were made.
*/
fun attachToList(head: LockFreeLinkedListHead): LockFreeLinkedListHead {
val newAddress = head.addLastWithoutModifying(this, permissionsBitmask = 0)
assert { newAddress != null }
return if (_address.compareAndSet(null, newAddress)) {
head
} else {
_address.value!!.segment.head
}
}

/**
* Remove this node from the list.
* The default value of 0 means that either the node is not in any list or [LockFreeLinkedListHead.addLast] wasn't
* yet called on it.
*/
open fun remove() {
_address.value?.let {
val segment = it.segment
segment.clearSlot(it.index)
}
}

private val _address = atomic<Address?>(null)

val address: Address get() = _address.value!!

internal fun trySetAddress(address: Address) = this._address.compareAndSet(null, address)
var address: Long = 0
}

/** @suppress **This is unstable API and it is subject to change.** */
Expand All @@ -66,13 +38,13 @@ internal open class LockFreeLinkedListHead: LockFreeLinkedListSegment(
*/
inline fun forEach(
forbidBitmask: Byte = 0,
startAfter: LockFreeLinkedListNode? = null,
block: (LockFreeLinkedListNode) -> Unit
startInSegment: LockFreeLinkedListSegment? = null,
startAfterIndex: Int? = null,
block: (LockFreeLinkedListNode, LockFreeLinkedListSegment, Int) -> Unit
) {
forbiddenBits.update { it or forbidBitmask.toInt() }
val startAddress = startAfter?.address
var segment: LockFreeLinkedListSegment? = startAddress?.segment ?: head
var startIndex: Int = startAddress?.index?.let { it + 1 } ?: 0
var segment: LockFreeLinkedListSegment? = startInSegment ?: this
var startIndex: Int = startAfterIndex?.let { it + 1 } ?: 0
while (segment != null) {
segment.forEach(forbidBitmask = forbidBitmask, startIndex = startIndex, block = block)
segment = segment.next
Expand All @@ -85,17 +57,15 @@ internal open class LockFreeLinkedListHead: LockFreeLinkedListSegment(
* and then sets the [node]'s address to the new address.
*/
fun addLast(node: LockFreeLinkedListNode, permissionsBitmask: Byte): Boolean {
val address = addLastWithoutModifying(node, permissionsBitmask) ?: return false
val success = node.trySetAddress(address)
assert { success }
node.address = addLastWithoutModifying(node, permissionsBitmask) ?: return false
return true
}

/**
* Adds the [node] to the end of the list if every bit in [permissionsBitmask] is still allowed in the list.
* As opposed to [addLast], doesn't modify the [node]'s address.
*/
fun addLastWithoutModifying(node: LockFreeLinkedListNode, permissionsBitmask: Byte): Address? {
fun addLastWithoutModifying(node: LockFreeLinkedListNode, permissionsBitmask: Byte): Long? {
/** First, avoid modifying the list at all if it was already closed for elements like ours. */
if (permissionsBitmask and forbiddenBits.value.toByte() != 0.toByte()) return null
/** Obtain the place from which the desired segment will certainly be reachable. */
Expand All @@ -115,13 +85,21 @@ internal open class LockFreeLinkedListHead: LockFreeLinkedListSegment(
* to observe the new segment and either break the cell where [node] wants to arrive or process the [node].
* In any case, we have linearizable behavior. */
return if (segment.tryAdd(node, permissionsBitmask = permissionsBitmask, indexInSegment = indexInSegment)) {
Address(segment, indexInSegment)
index
} else {
null
}
}

override val head: LockFreeLinkedListHead get() = this
fun remove(node: LockFreeLinkedListNode) {
val address = node.address
val id = address / SEGMENT_SIZE
var segment: LockFreeLinkedListSegment = this
while (segment.id < id) { segment = segment.next!! }
if (segment.id == id) {
segment.clearSlot((address % SEGMENT_SIZE).toInt(), node)
}
}
}

internal open class LockFreeLinkedListSegment(
Expand All @@ -135,15 +113,15 @@ internal open class LockFreeLinkedListSegment(

override val numberOfSlots: Int get() = SEGMENT_SIZE

fun clearSlot(index: Int) {
cells[index].value = null
onSlotCleaned()
fun clearSlot(index: Int, node: LockFreeLinkedListNode) {
if (cells[index].compareAndSet(node, null))
onSlotCleaned()
}

inline fun forEach(forbidBitmask: Byte, startIndex: Int, block: (LockFreeLinkedListNode) -> Unit) {
inline fun forEach(forbidBitmask: Byte, startIndex: Int, block: (LockFreeLinkedListNode, LockFreeLinkedListSegment, Int) -> Unit) {
for (i in startIndex until SEGMENT_SIZE) {
val node = breakCellOrGetValue(forbidBitmask, i)
if (node != null) block(node)
if (node != null) block(node, this, i)
}
}

Expand Down Expand Up @@ -183,12 +161,8 @@ internal open class LockFreeLinkedListSegment(
override fun onCancellation(index: Int, cause: Throwable?, context: CoroutineContext) {
throw UnsupportedOperationException("Cancellation is not supported on LockFreeLinkedList")
}

open val head: LockFreeLinkedListHead get() = prev!!.head
}

internal class Address(@JvmField val segment: LockFreeLinkedListSegment, @JvmField val index: Int)

private fun createSegment(id: Long, prev: LockFreeLinkedListSegment): LockFreeLinkedListSegment =
LockFreeLinkedListSegment(
id = id,
Expand Down
8 changes: 4 additions & 4 deletions kotlinx-coroutines-core/jvm/test/MemoryFootprintTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ class MemoryFootprintTest : TestBase(true) {

@Test
fun testJobSize() {
assertTotalSize(jobWithChildren(1), 112)
assertTotalSize(jobWithChildren(2), 336) // originally: 192
assertTotalSize(jobWithChildren(3), 416) // originally: 248
assertTotalSize(jobWithChildren(4), 496) // originally: 304
assertTotalSize(jobWithChildren(1), 120) // originally: 112
assertTotalSize(jobWithChildren(2), 304) // originally: 192
assertTotalSize(jobWithChildren(3), 368) // originally: 248
assertTotalSize(jobWithChildren(4), 432) // originally: 304
}

private fun jobWithChildren(numberOfChildren: Int): Job {
Expand Down

0 comments on commit 03a42b9

Please sign in to comment.