Skip to content

Commit

Permalink
fix(model-server): @RequiresTransaction annotation to help avoiding M…
Browse files Browse the repository at this point in the history
…issingTransactionException
  • Loading branch information
slisson committed Dec 12, 2024
1 parent c1df7be commit 110b426
Show file tree
Hide file tree
Showing 32 changed files with 529 additions and 255 deletions.
10 changes: 7 additions & 3 deletions model-server/src/main/kotlin/org/modelix/model/server/Main.kt
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import org.modelix.model.server.handlers.ui.RepositoryOverview
import org.modelix.model.server.store.IgniteStoreClient
import org.modelix.model.server.store.InMemoryStoreClient
import org.modelix.model.server.store.IsolatingStore
import org.modelix.model.server.store.RequiresTransaction
import org.modelix.model.server.store.forGlobalRepository
import org.modelix.model.server.store.loadDump
import org.modelix.model.server.store.writeDump
Expand Down Expand Up @@ -149,9 +150,12 @@ object Main {
}
var i = 0
val globalStoreClient = storeClient.forGlobalRepository()
while (i < cmdLineArgs.setValues.size) {
globalStoreClient.put(cmdLineArgs.setValues[i], cmdLineArgs.setValues[i + 1])
i += 2
@OptIn(RequiresTransaction::class)
globalStoreClient.getTransactionManager().runWrite {
while (i < cmdLineArgs.setValues.size) {
globalStoreClient.put(cmdLineArgs.setValues[i], cmdLineArgs.setValues[i + 1])
i += 2
}
}
val repositoriesManager = RepositoriesManager(storeClient)
val modelServer = KeyValueLikeModelServer(repositoriesManager)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import io.ktor.server.application.call
import io.ktor.server.response.respondText
import io.ktor.util.pipeline.PipelineContext
import org.modelix.model.server.handlers.KeyValueLikeModelServer.Companion.PROTECTED_PREFIX
import org.modelix.model.server.store.RequiresTransaction
import org.modelix.model.server.store.StoreManager

class HealthApiImpl(
Expand All @@ -25,6 +26,7 @@ class HealthApiImpl(

private fun isHealthy(): Boolean {
val store = stores.getGlobalStoreClient()
@OptIn(RequiresTransaction::class)
return store.getTransactionManager().runWrite {
val value = toLong(store[HEALTH_KEY]) + 1
store.put(HEALTH_KEY, java.lang.Long.toString(value))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import org.modelix.model.lazy.IDeserializingKeyValueStore
import org.modelix.model.lazy.RepositoryId
import org.modelix.model.server.store.IStoreClient
import org.modelix.model.server.store.ITransactionManager
import org.modelix.model.server.store.RequiresTransaction
import org.modelix.model.server.store.StoreManager

interface IRepositoriesManager {
Expand All @@ -19,29 +20,38 @@ interface IRepositoriesManager {
* If the server ID was created previously but is only stored under a legacy database key,
* it also gets stored under the current and all legacy database keys.
*/
@RequiresTransaction
fun maybeInitAndGetSeverId(): String

@RequiresTransaction
fun getRepositories(): Set<RepositoryId>

@RequiresTransaction
fun createRepository(repositoryId: RepositoryId, userName: String?, useRoleIds: Boolean = true, legacyGlobalStorage: Boolean = false): CLVersion

@RequiresTransaction
fun removeRepository(repository: RepositoryId): Boolean

@RequiresTransaction
fun getBranches(repositoryId: RepositoryId): Set<BranchReference>

/**
* Same as [removeBranches] but blocking.
* Caller is expected to execute it outside the request thread.
*/
@RequiresTransaction
fun removeBranches(repository: RepositoryId, branchNames: Set<String>)

@RequiresTransaction
fun getVersion(branch: BranchReference): CLVersion?
fun getVersion(repository: RepositoryId, versionHash: String): CLVersion?

@RequiresTransaction
fun getVersionHash(branch: BranchReference): String?
suspend fun pollVersionHash(branch: BranchReference, lastKnown: String?): String
fun mergeChanges(branch: BranchReference, newVersionHash: String): String

/**
* Same as [mergeChanges] but blocking.
* Caller is expected to execute it outside the request thread.
*/
fun mergeChangesBlocking(branch: BranchReference, newVersionHash: String): String
@RequiresTransaction
fun mergeChanges(branch: BranchReference, newVersionHash: String): String
suspend fun computeDelta(repository: RepositoryId?, versionHash: String, baseVersionHash: String?): ObjectData

/**
Expand All @@ -56,6 +66,7 @@ interface IRepositoriesManager {
fun getTransactionManager(): ITransactionManager
}

@RequiresTransaction
fun IRepositoriesManager.getBranchNames(repositoryId: RepositoryId): Set<String> {
return getBranches(repositoryId).map { it.branchName }.toSet()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import io.ktor.server.routing.routing
import io.ktor.util.pipeline.PipelineContext
import org.modelix.authorization.getUserName
import org.modelix.authorization.requiresLogin
import org.modelix.model.server.store.RequiresTransaction
import org.modelix.model.server.store.runReadIO

/**
* Implementation of the REST API that is responsible for handling client and server IDs.
Expand All @@ -24,7 +26,11 @@ class IdsApiImpl(
//
// Functionally, it does not matter if the server ID is created eagerly or lazily,
// as long as the same server ID is returned from the same server.
val serverId = repositoriesManager.maybeInitAndGetSeverId()
val serverId =
@OptIn(RequiresTransaction::class)
repositoriesManager.getTransactionManager().runReadIO {
repositoriesManager.maybeInitAndGetSeverId()
}
call.respondText(serverId)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.modelix.model.lazy.BranchReference
import org.modelix.model.persistent.HashUtil
import org.modelix.model.server.ModelServerPermissionSchema
import org.modelix.model.server.store.ObjectInRepository
import org.modelix.model.server.store.RequiresTransaction
import org.modelix.model.server.store.StoreManager
import org.modelix.model.server.store.pollEntry
import org.modelix.model.server.store.runReadIO
Expand Down Expand Up @@ -61,6 +62,7 @@ class KeyValueLikeModelServer(
// request to initialize it lazily, would make the code less robust.
// Each change in the logic of RepositoriesManager#maybeInitAndGetSeverId would need
// the special conditions in the affected requests to be updated.
@OptIn(RequiresTransaction::class)
repositoriesManager.getTransactionManager().runWrite { repositoriesManager.maybeInitAndGetSeverId() }
application.apply {
modelServerModule()
Expand Down Expand Up @@ -89,6 +91,7 @@ class KeyValueLikeModelServer(
get<Paths.getKeyGet> {
val key = call.parameters["key"]!!
checkKeyPermission(key, EPermissionType.READ)
@OptIn(RequiresTransaction::class)
val value = runRead { stores.getGlobalStoreClient()[key] }
respondValue(key, value)
}
Expand All @@ -113,13 +116,15 @@ class KeyValueLikeModelServer(
get<Paths.getRecursivelyKeyGet> {
val key = call.parameters["key"]!!
checkKeyPermission(key, EPermissionType.READ)
@OptIn(RequiresTransaction::class)
call.respondText(runRead { collect(key, this) }.toString(2), contentType = ContentType.Application.Json)
}

put<Paths.putKeyPut> {
val key = call.parameters["key"]!!
val value = call.receiveText()
try {
@OptIn(RequiresTransaction::class)
runWrite {
putEntries(mapOf(key to value))
}
Expand All @@ -141,6 +146,7 @@ class KeyValueLikeModelServer(
}
entries = sortByDependency(entries)
try {
@OptIn(RequiresTransaction::class)
runWrite {
putEntries(entries)
}
Expand All @@ -162,6 +168,7 @@ class KeyValueLikeModelServer(
checkKeyPermission(key, EPermissionType.READ)
keys.add(key)
}
@OptIn(RequiresTransaction::class)
val values = runRead { stores.getGlobalStoreClient(false).getAll(keys) }
for (i in keys.indices) {
val respEntry = JSONObject()
Expand Down Expand Up @@ -203,6 +210,7 @@ class KeyValueLikeModelServer(
return sorted
}

@RequiresTransaction
fun collect(rootKey: String, callContext: CallContext?): JSONArray {
val result = JSONArray()
val processed: MutableSet<String> = HashSet()
Expand Down Expand Up @@ -244,6 +252,7 @@ class KeyValueLikeModelServer(
return result
}

@RequiresTransaction
private fun CallContext.putEntries(newEntries: Map<String, String?>) {
val referencedKeys: MutableSet<String> = HashSet()
for ((key, value) in newEntries) {
Expand Down Expand Up @@ -312,7 +321,7 @@ class KeyValueLikeModelServer(
repositoriesManager.removeBranches(branch.repositoryId, setOf(branch.branchName))
} else {
checkPermission(ModelServerPermissionSchema.branch(branch).push)
repositoriesManager.mergeChangesBlocking(branch, value)
repositoriesManager.mergeChanges(branch, value)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import org.modelix.model.server.api.v2.ImmutableObjectsStream
import org.modelix.model.server.api.v2.VersionDelta
import org.modelix.model.server.api.v2.VersionDeltaStream
import org.modelix.model.server.api.v2.VersionDeltaStreamV2
import org.modelix.model.server.store.RequiresTransaction
import org.modelix.model.server.store.StoreManager
import org.modelix.model.server.store.runReadIO
import org.modelix.model.server.store.runWriteIO
Expand Down Expand Up @@ -91,6 +92,7 @@ class ModelReplicationServer(

override suspend fun PipelineContext<Unit, ApplicationCall>.getRepositories() {
call.respondText(
@OptIn(RequiresTransaction::class)
runRead { repositoriesManager.getRepositories() }
.filter { call.hasPermission(ModelServerPermissionSchema.repository(it).list) }
.joinToString("\n") { it.id },
Expand All @@ -99,6 +101,7 @@ class ModelReplicationServer(

override suspend fun PipelineContext<Unit, ApplicationCall>.getRepositoryBranches(repository: String) {
call.respondText(
@OptIn(RequiresTransaction::class)
runRead { repositoriesManager.getBranchNames(repositoryId(repository)) }
.filter { call.hasPermission(ModelServerPermissionSchema.repository(repository).branch(it).list) }
.joinToString("\n"),
Expand All @@ -112,6 +115,8 @@ class ModelReplicationServer(
) {
checkPermission(ModelServerPermissionSchema.repository(repository).branch(branch).pull)
val branchRef = repositoryId(repository).getBranchReference(branch)

@OptIn(RequiresTransaction::class)
val versionHash = runRead {
repositoriesManager.getVersionHash(branchRef) ?: throw BranchNotFoundException(branchRef)
}
Expand All @@ -125,7 +130,11 @@ class ModelReplicationServer(
) {
checkPermission(ModelServerPermissionSchema.repository(repository).branch(branch).pull)
val branchRef = repositoryId(repository).getBranchReference(branch)
val versionHash = runRead { repositoriesManager.getVersionHash(branchRef) ?: throw BranchNotFoundException(branchRef) }

@OptIn(RequiresTransaction::class)
val versionHash = runRead {
repositoriesManager.getVersionHash(branchRef) ?: throw BranchNotFoundException(branchRef)
}
call.respond(BranchV1(branch, versionHash))
}

Expand All @@ -141,6 +150,7 @@ class ModelReplicationServer(

checkPermission(ModelServerPermissionSchema.repository(repositoryId).branch(branch).delete)

@OptIn(RequiresTransaction::class)
runWrite {
if (!repositoriesManager.getBranchNames(repositoryId).contains(branch)) {
throw BranchNotFoundException(branch, repositoryId.id)
Expand All @@ -158,7 +168,11 @@ class ModelReplicationServer(
) {
checkPermission(ModelServerPermissionSchema.repository(repository).branch(branch).pull)
val branchRef = repositoryId(repository).getBranchReference(branch)
val versionHash = runRead { repositoriesManager.getVersionHash(branchRef) ?: throw BranchNotFoundException(branchRef) }

@OptIn(RequiresTransaction::class)
val versionHash = runRead {
repositoriesManager.getVersionHash(branchRef) ?: throw BranchNotFoundException(branchRef)
}
call.respondText(versionHash)
}

Expand All @@ -168,6 +182,7 @@ class ModelReplicationServer(
legacyGlobalStorage: Boolean?,
) {
checkPermission(ModelServerPermissionSchema.repository(repository).create)
@OptIn(RequiresTransaction::class)
val initialVersion = runWrite {
repositoriesManager.createRepository(
repositoryId(repository),
Expand All @@ -182,6 +197,7 @@ class ModelReplicationServer(
override suspend fun PipelineContext<Unit, ApplicationCall>.deleteRepository(repository: String) {
checkPermission(ModelServerPermissionSchema.repository(repository).delete)

@OptIn(RequiresTransaction::class)
val foundAndDeleted = runWrite {
repositoriesManager.removeRepository(repositoryId(repository))
}
Expand All @@ -200,7 +216,9 @@ class ModelReplicationServer(
val branchRef = repositoryId(repository).getBranchReference(branch)
val deltaFromClient = call.receive<VersionDelta>()
deltaFromClient.checkObjectHashes()
@OptIn(RequiresTransaction::class) // no transactions required for immutable store
repositoriesManager.getStoreClient(RepositoryId(repository), true).putAll(deltaFromClient.getAllObjects())
@OptIn(RequiresTransaction::class)
val mergedHash = runWrite {
repositoriesManager.mergeChanges(branchRef, deltaFromClient.versionHash)
}
Expand Down Expand Up @@ -228,6 +246,7 @@ class ModelReplicationServer(
}

val objects = withContext(Dispatchers.IO) {
@OptIn(RequiresTransaction::class) // no transactions required for immutable store
repositoriesManager.getStoreClient(RepositoryId(repository), true).getAll(keys)
}

Expand Down Expand Up @@ -271,6 +290,7 @@ class ModelReplicationServer(
) {
val branchRef = repositoryId(repository).getBranchReference(branchName)
checkPermission(ModelServerPermissionSchema.branch(branchRef).query)
@OptIn(RequiresTransaction::class)
val version = runRead { repositoriesManager.getVersion(branchRef) ?: throw BranchNotFoundException(branchRef) }
LOG.trace("Running query on {} @ {}", branchRef, version)
val initialTree = version.getTree()
Expand Down Expand Up @@ -301,6 +321,7 @@ class ModelReplicationServer(
baseVersion = version,
operations = ops.map { it.getOriginalOp() }.toTypedArray(),
)
@OptIn(RequiresTransaction::class)
runWrite {
repositoriesManager.mergeChanges(branchRef, newVersion.getContentHash())
}
Expand Down Expand Up @@ -343,6 +364,7 @@ class ModelReplicationServer(
}

withContext(Dispatchers.IO) {
@OptIn(RequiresTransaction::class) // no transactions required for immutable store
repositoriesManager.getStoreClient(RepositoryId(repository), true).putAll(entries, true)
}
call.respondText("${entries.size} objects received")
Expand All @@ -354,6 +376,7 @@ class ModelReplicationServer(
lastKnown: String?,
) {
checkPermission(ModelServerPermissionSchema.legacyGlobalObjects.read)
@OptIn(RequiresTransaction::class)
if (runRead { stores.getGlobalStoreClient()[versionHash] } == null) {
throw VersionNotFoundException(versionHash)
}
Expand Down
Loading

0 comments on commit 110b426

Please sign in to comment.