Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(model-server): deadlock caused by non-existing lock ordering #1235

Merged
merged 4 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions model-server/src/main/kotlin/org/modelix/model/server/Main.kt
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import org.modelix.model.server.handlers.ui.RepositoryOverview
import org.modelix.model.server.store.IgniteStoreClient
import org.modelix.model.server.store.InMemoryStoreClient
import org.modelix.model.server.store.IsolatingStore
import org.modelix.model.server.store.RequiresTransaction
import org.modelix.model.server.store.forGlobalRepository
import org.modelix.model.server.store.loadDump
import org.modelix.model.server.store.writeDump
Expand Down Expand Up @@ -149,9 +150,12 @@ object Main {
}
var i = 0
val globalStoreClient = storeClient.forGlobalRepository()
while (i < cmdLineArgs.setValues.size) {
globalStoreClient.put(cmdLineArgs.setValues[i], cmdLineArgs.setValues[i + 1])
i += 2
@OptIn(RequiresTransaction::class)
globalStoreClient.getTransactionManager().runWrite {
while (i < cmdLineArgs.setValues.size) {
globalStoreClient.put(cmdLineArgs.setValues[i], cmdLineArgs.setValues[i + 1])
i += 2
}
}
val repositoriesManager = RepositoriesManager(storeClient)
val modelServer = KeyValueLikeModelServer(repositoriesManager)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import io.ktor.server.application.call
import io.ktor.server.response.respondText
import io.ktor.util.pipeline.PipelineContext
import org.modelix.model.server.handlers.KeyValueLikeModelServer.Companion.PROTECTED_PREFIX
import org.modelix.model.server.store.RequiresTransaction
import org.modelix.model.server.store.StoreManager

class HealthApiImpl(
Expand All @@ -25,9 +26,12 @@ class HealthApiImpl(

private fun isHealthy(): Boolean {
val store = stores.getGlobalStoreClient()
val value = toLong(store[HEALTH_KEY]) + 1
store.put(HEALTH_KEY, java.lang.Long.toString(value))
return toLong(store[HEALTH_KEY]) >= value
@OptIn(RequiresTransaction::class)
return store.getTransactionManager().runWrite {
val value = toLong(store[HEALTH_KEY]) + 1
store.put(HEALTH_KEY, java.lang.Long.toString(value))
toLong(store[HEALTH_KEY]) >= value
}
}

private fun toLong(value: String?): Long {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import org.modelix.model.lazy.IDeserializingKeyValueStore
import org.modelix.model.lazy.RepositoryId
import org.modelix.model.server.store.IStoreClient
import org.modelix.model.server.store.ITransactionManager
import org.modelix.model.server.store.RequiresTransaction
import org.modelix.model.server.store.StoreManager

interface IRepositoriesManager {
Expand All @@ -18,31 +20,38 @@
* If the server ID was created previously but is only stored under a legacy database key,
* it also gets stored under the current and all legacy database keys.
*/
suspend fun maybeInitAndGetSeverId(): String
@RequiresTransaction
fun maybeInitAndGetSeverId(): String

@RequiresTransaction
fun getRepositories(): Set<RepositoryId>
odzhychko marked this conversation as resolved.
Show resolved Hide resolved
suspend fun createRepository(repositoryId: RepositoryId, userName: String?, useRoleIds: Boolean = true, legacyGlobalStorage: Boolean = false): CLVersion
suspend fun removeRepository(repository: RepositoryId): Boolean

fun getBranches(repositoryId: RepositoryId): Set<BranchReference>
@RequiresTransaction
fun createRepository(repositoryId: RepositoryId, userName: String?, useRoleIds: Boolean = true, legacyGlobalStorage: Boolean = false): CLVersion

Check warning

Code scanning / detekt

The function createRepository is missing documentation. Warning

The function createRepository is missing documentation.

suspend fun removeBranches(repository: RepositoryId, branchNames: Set<String>)
@RequiresTransaction
fun removeRepository(repository: RepositoryId): Boolean

Check warning

Code scanning / detekt

The function removeRepository is missing documentation. Warning

The function removeRepository is missing documentation.

@RequiresTransaction
fun getBranches(repositoryId: RepositoryId): Set<BranchReference>

Check warning

Code scanning / detekt

The function getBranches is missing documentation. Warning

The function getBranches is missing documentation.

/**
* Same as [removeBranches] but blocking.
* Caller is expected to execute it outside the request thread.
*/
fun removeBranchesBlocking(repository: RepositoryId, branchNames: Set<String>)
suspend fun getVersion(branch: BranchReference): CLVersion?
suspend fun getVersion(repository: RepositoryId, versionHash: String): CLVersion?
suspend fun getVersionHash(branch: BranchReference): String?
@RequiresTransaction
fun removeBranches(repository: RepositoryId, branchNames: Set<String>)

@RequiresTransaction
fun getVersion(branch: BranchReference): CLVersion?

Check warning

Code scanning / detekt

The function getVersion is missing documentation. Warning

The function getVersion is missing documentation.
fun getVersion(repository: RepositoryId, versionHash: String): CLVersion?

Check warning

Code scanning / detekt

The function getVersion is missing documentation. Warning

The function getVersion is missing documentation.

@RequiresTransaction
fun getVersionHash(branch: BranchReference): String?

Check warning

Code scanning / detekt

The function getVersionHash is missing documentation. Warning

The function getVersionHash is missing documentation.
suspend fun pollVersionHash(branch: BranchReference, lastKnown: String?): String
suspend fun mergeChanges(branch: BranchReference, newVersionHash: String): String

/**
* Same as [mergeChanges] but blocking.
* Caller is expected to execute it outside the request thread.
*/
fun mergeChangesBlocking(branch: BranchReference, newVersionHash: String): String
@RequiresTransaction
fun mergeChanges(branch: BranchReference, newVersionHash: String): String

Check warning

Code scanning / detekt

The function mergeChanges is missing documentation. Warning

The function mergeChanges is missing documentation.
suspend fun computeDelta(repository: RepositoryId?, versionHash: String, baseVersionHash: String?): ObjectData

/**
Expand All @@ -54,14 +63,16 @@
fun isIsolated(repository: RepositoryId): Boolean?

fun getStoreManager(): StoreManager
fun getTransactionManager(): ITransactionManager

Check warning

Code scanning / detekt

The function getTransactionManager is missing documentation. Warning

The function getTransactionManager is missing documentation.
}

@RequiresTransaction
fun IRepositoriesManager.getBranchNames(repositoryId: RepositoryId): Set<String> {
return getBranches(repositoryId).map { it.branchName }.toSet()
}

fun IRepositoriesManager.getStoreClient(repository: RepositoryId?): IStoreClient {
return getStoreManager().getStoreClient(repository?.takeIf { isIsolated(it) ?: false })
fun IRepositoriesManager.getStoreClient(repository: RepositoryId?, immutable: Boolean): IStoreClient {

Check warning

Code scanning / detekt

The function getStoreClient is missing documentation. Warning

The function getStoreClient is missing documentation.
return getStoreManager().getStoreClient(repository?.takeIf { isIsolated(it) ?: false }, immutable)
}

fun IRepositoriesManager.getAsyncStore(repository: RepositoryId?): IAsyncObjectStore {
odzhychko marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import io.ktor.server.routing.routing
import io.ktor.util.pipeline.PipelineContext
import org.modelix.authorization.getUserName
import org.modelix.authorization.requiresLogin
import org.modelix.model.server.store.RequiresTransaction
import org.modelix.model.server.store.runReadIO

/**
* Implementation of the REST API that is responsible for handling client and server IDs.
Expand All @@ -24,7 +26,11 @@ class IdsApiImpl(
//
// Functionally, it does not matter if the server ID is created eagerly or lazily,
// as long as the same server ID is returned from the same server.
val serverId = repositoriesManager.maybeInitAndGetSeverId()
val serverId =
@OptIn(RequiresTransaction::class)
repositoriesManager.getTransactionManager().runReadIO {
repositoriesManager.maybeInitAndGetSeverId()
}
call.respondText(serverId)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import io.ktor.server.response.respondText
import io.ktor.server.routing.routing
import io.ktor.util.pipeline.PipelineContext
import kotlinx.coroutines.runBlocking
import kotlinx.html.br
import kotlinx.html.div
import kotlinx.html.h1
Expand All @@ -29,9 +28,11 @@
import org.modelix.model.persistent.HashUtil
import org.modelix.model.server.ModelServerPermissionSchema
import org.modelix.model.server.store.ObjectInRepository
import org.modelix.model.server.store.RequiresTransaction
import org.modelix.model.server.store.StoreManager
import org.modelix.model.server.store.pollEntry
import org.modelix.model.server.store.runTransactionSuspendable
import org.modelix.model.server.store.runReadIO
import org.modelix.model.server.store.runWriteIO
import org.modelix.model.server.templates.PageWithMenuBar
import java.io.IOException
import java.util.*
Expand Down Expand Up @@ -61,7 +62,8 @@
// request to initialize it lazily, would make the code less robust.
// Each change in the logic of RepositoriesManager#maybeInitAndGetSeverId would need
// the special conditions in the affected requests to be updated.
runBlocking { repositoriesManager.maybeInitAndGetSeverId() }
@OptIn(RequiresTransaction::class)
repositoriesManager.getTransactionManager().runWrite { repositoriesManager.maybeInitAndGetSeverId() }
application.apply {
modelServerModule()
}
Expand Down Expand Up @@ -89,7 +91,8 @@
get<Paths.getKeyGet> {
val key = call.parameters["key"]!!
checkKeyPermission(key, EPermissionType.READ)
val value = stores.getGlobalKeyValueStore()[key]
@OptIn(RequiresTransaction::class)
val value = runRead { stores.getGlobalStoreClient()[key] }
respondValue(key, value)
}
get<Paths.pollKeyGet> {
Expand All @@ -106,21 +109,25 @@
post<Paths.counterKeyPost> {
val key = call.parameters["key"]!!
checkKeyPermission(key, EPermissionType.WRITE)
val value = stores.getGlobalStoreClient().generateId(key)
val value = stores.getGlobalStoreClient(false).generateId(key)
call.respondText(text = value.toString())
}

get<Paths.getRecursivelyKeyGet> {
val key = call.parameters["key"]!!
checkKeyPermission(key, EPermissionType.READ)
call.respondText(collect(key, this).toString(2), contentType = ContentType.Application.Json)
@OptIn(RequiresTransaction::class)
call.respondText(runRead { collect(key, this) }.toString(2), contentType = ContentType.Application.Json)
}

put<Paths.putKeyPut> {
val key = call.parameters["key"]!!
val value = call.receiveText()
try {
putEntries(mapOf(key to value))
@OptIn(RequiresTransaction::class)
runWrite {
putEntries(mapOf(key to value))
}
call.respondText("OK")
} catch (e: NotFoundException) {
throw HttpException(HttpStatusCode.NotFound, title = "Not found", details = e.message, cause = e)
Expand All @@ -139,7 +146,10 @@
}
entries = sortByDependency(entries)
try {
putEntries(entries)
@OptIn(RequiresTransaction::class)
runWrite {
putEntries(entries)
}
call.respondText(entries.size.toString() + " entries written")
} catch (e: NotFoundException) {
throw HttpException(HttpStatusCode.NotFound, title = "Not found", details = e.message, cause = e)
Expand All @@ -158,7 +168,8 @@
checkKeyPermission(key, EPermissionType.READ)
keys.add(key)
}
val values = stores.getGlobalStoreClient().getAll(keys)
@OptIn(RequiresTransaction::class)
val values = runRead { stores.getGlobalStoreClient(false).getAll(keys) }
for (i in keys.indices) {
val respEntry = JSONObject()
respEntry.put("key", keys[i])
Expand Down Expand Up @@ -199,6 +210,7 @@
return sorted
}

@RequiresTransaction
fun collect(rootKey: String, callContext: CallContext?): JSONArray {
val result = JSONArray()
val processed: MutableSet<String> = HashSet()
Expand All @@ -210,7 +222,7 @@
if (callContext != null) {
keys.forEach { callContext.checkKeyPermission(it, EPermissionType.READ) }
}
val values = stores.getGlobalStoreClient().getAll(keys)
val values = stores.getGlobalStoreClient(false).getAll(keys)
for (i in keys.indices) {
val key = keys[i]
val value = values[i]
Expand Down Expand Up @@ -240,7 +252,8 @@
return result
}

private suspend fun CallContext.putEntries(newEntries: Map<String, String?>) {
@RequiresTransaction
private fun CallContext.putEntries(newEntries: Map<String, String?>) {

Check warning

Code scanning / detekt

Function putEntries is nested too deeply. Warning

Function putEntries is nested too deeply.

Check warning

Code scanning / detekt

The function putEntries is too long (61). The maximum length is 60. Warning

The function putEntries is too long (61). The maximum length is 60.
val referencedKeys: MutableSet<String> = HashSet()
for ((key, value) in newEntries) {
checkKeyPermission(key, EPermissionType.WRITE)
Expand Down Expand Up @@ -300,17 +313,15 @@
// We could try to move the objects later, but since this API is deprecated, it's not worth the effort.
}

stores.getGlobalStoreClient().runTransactionSuspendable {
stores.genericStore.putAll(hashedObjects.mapKeys { ObjectInRepository.global(it.key) })
stores.genericStore.putAll(userDefinedEntries.mapKeys { ObjectInRepository.global(it.key) })
for ((branch, value) in branchChanges) {
if (value == null) {
checkPermission(ModelServerPermissionSchema.branch(branch).delete)
repositoriesManager.removeBranchesBlocking(branch.repositoryId, setOf(branch.branchName))
} else {
checkPermission(ModelServerPermissionSchema.branch(branch).push)
repositoriesManager.mergeChangesBlocking(branch, value)
}
stores.genericStore.putAll(hashedObjects.mapKeys { ObjectInRepository.global(it.key) })
odzhychko marked this conversation as resolved.
Show resolved Hide resolved
stores.genericStore.putAll(userDefinedEntries.mapKeys { ObjectInRepository.global(it.key) })
for ((branch, value) in branchChanges) {
if (value == null) {
checkPermission(ModelServerPermissionSchema.branch(branch).delete)
repositoriesManager.removeBranches(branch.repositoryId, setOf(branch.branchName))
} else {
checkPermission(ModelServerPermissionSchema.branch(branch).push)
repositoriesManager.mergeChanges(branch, value)
}
}
}
Expand Down Expand Up @@ -363,4 +374,12 @@
else -> unknown()
}
}

private suspend fun <R> runRead(body: () -> R): R {
return repositoriesManager.getTransactionManager().runReadIO(body)
}

private suspend fun <R> runWrite(body: () -> R): R {
return repositoriesManager.getTransactionManager().runWriteIO(body)
}
}
Loading
Loading