From 9ff4825fe6bcf558010e86716f91c918b9b1a266 Mon Sep 17 00:00:00 2001 From: Oleksandr Dzhychko Date: Tue, 2 Apr 2024 11:59:29 +0200 Subject: [PATCH 1/3] refactor(model-server): extract IRepositoriesManager interface to make route handlers testable --- .../kotlin/org/modelix/model/server/Main.kt | 6 +- .../model/server/handlers/ContentExplorer.kt | 2 +- .../model/server/handlers/HistoryHandler.kt | 2 +- .../server/handlers/IRepositoriesManager.kt | 62 +++++++++++++++++++ .../handlers/KeyValueLikeModelServer.kt | 17 +++-- .../server/handlers/ModelReplicationServer.kt | 26 +++++--- .../server/handlers/RepositoriesManager.kt | 36 +++++------ .../server/handlers/RepositoryOverview.kt | 2 +- .../model/server/PullPerformanceTest.kt | 6 +- .../model/server/ReplicatedRepositoryTest.kt | 10 ++- .../handlers/ModelReplicationServerTest.kt | 10 ++- 11 files changed, 129 insertions(+), 50 deletions(-) create mode 100644 model-server/src/main/kotlin/org/modelix/model/server/handlers/IRepositoriesManager.kt diff --git a/model-server/src/main/kotlin/org/modelix/model/server/Main.kt b/model-server/src/main/kotlin/org/modelix/model/server/Main.kt index 9e1d693b7f..e053919ef1 100644 --- a/model-server/src/main/kotlin/org/modelix/model/server/Main.kt +++ b/model-server/src/main/kotlin/org/modelix/model/server/Main.kt @@ -48,6 +48,7 @@ import org.apache.commons.io.FileUtils import org.apache.ignite.Ignition import org.modelix.authorization.KeycloakUtils import org.modelix.authorization.installAuthentication +import org.modelix.model.InMemoryModels import org.modelix.model.server.handlers.ContentExplorer import org.modelix.model.server.handlers.DeprecatedLightModelServer import org.modelix.model.server.handlers.HistoryHandler @@ -150,8 +151,9 @@ object Main { i += 2 } val localModelClient = LocalModelClient(storeClient) + val inMemoryModels = InMemoryModels() val repositoriesManager = RepositoriesManager(localModelClient) - val modelServer = KeyValueLikeModelServer(repositoriesManager) + val modelServer = KeyValueLikeModelServer(repositoriesManager, storeClient, inMemoryModels) val sharedSecretFile = cmdLineArgs.secretFile if (sharedSecretFile.exists()) { modelServer.setSharedSecret( @@ -162,7 +164,7 @@ object Main { val repositoryOverview = RepositoryOverview(repositoriesManager) val historyHandler = HistoryHandler(localModelClient, repositoriesManager) val contentExplorer = ContentExplorer(localModelClient, repositoriesManager) - val modelReplicationServer = ModelReplicationServer(repositoriesManager) + val modelReplicationServer = ModelReplicationServer(repositoriesManager, localModelClient, inMemoryModels) val metricsHandler = MetricsHandler() val configureNetty: NettyApplicationEngine.Configuration.() -> Unit = { diff --git a/model-server/src/main/kotlin/org/modelix/model/server/handlers/ContentExplorer.kt b/model-server/src/main/kotlin/org/modelix/model/server/handlers/ContentExplorer.kt index 9f105746e4..5688464cba 100644 --- a/model-server/src/main/kotlin/org/modelix/model/server/handlers/ContentExplorer.kt +++ b/model-server/src/main/kotlin/org/modelix/model/server/handlers/ContentExplorer.kt @@ -50,7 +50,7 @@ import org.modelix.model.lazy.RepositoryId import org.modelix.model.server.templates.PageWithMenuBar import kotlin.collections.set -class ContentExplorer(private val client: IModelClient, private val repoManager: RepositoriesManager) { +class ContentExplorer(private val client: IModelClient, private val repoManager: IRepositoriesManager) { fun init(application: Application) { application.routing { diff --git a/model-server/src/main/kotlin/org/modelix/model/server/handlers/HistoryHandler.kt b/model-server/src/main/kotlin/org/modelix/model/server/handlers/HistoryHandler.kt index 470492dabc..0be47b087b 100644 --- a/model-server/src/main/kotlin/org/modelix/model/server/handlers/HistoryHandler.kt +++ b/model-server/src/main/kotlin/org/modelix/model/server/handlers/HistoryHandler.kt @@ -55,7 +55,7 @@ import org.modelix.model.server.templates.PageWithMenuBar import java.time.LocalDateTime import java.time.format.DateTimeFormatter -class HistoryHandler(val client: IModelClient, private val repositoriesManager: RepositoriesManager) { +class HistoryHandler(val client: IModelClient, private val repositoriesManager: IRepositoriesManager) { fun init(application: Application) { application.routing { diff --git a/model-server/src/main/kotlin/org/modelix/model/server/handlers/IRepositoriesManager.kt b/model-server/src/main/kotlin/org/modelix/model/server/handlers/IRepositoriesManager.kt new file mode 100644 index 0000000000..41d9cce4f2 --- /dev/null +++ b/model-server/src/main/kotlin/org/modelix/model/server/handlers/IRepositoriesManager.kt @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2024. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.modelix.model.server.handlers + +import org.modelix.model.lazy.BranchReference +import org.modelix.model.lazy.CLVersion +import org.modelix.model.lazy.RepositoryId + +interface IRepositoriesManager { + /** + * Used to retrieve the server ID. If needed, the server ID is created and stored. + * + * If a server ID was not created yet, it is generated and saved in the database. + * It gets stored under the current and all legacy database keys. + * + * If the server ID was created previously but is only stored under a legacy database key, + * it also gets stored under the current and all legacy database keys. + */ + suspend fun maybeInitAndGetSeverId(): String + fun getRepositories(): Set + suspend fun createRepository(repositoryId: RepositoryId, userName: String?, useRoleIds: Boolean = true): CLVersion + suspend fun removeRepository(repository: RepositoryId): Boolean + + fun getBranches(repositoryId: RepositoryId): Set + + suspend fun removeBranches(repository: RepositoryId, branchNames: Set) + + /** + * Same as [removeBranches] but blocking. + * Caller is expected to execute it outside the request thread. + */ + fun removeBranchesBlocking(repository: RepositoryId, branchNames: Set) + suspend fun getVersion(branch: BranchReference): CLVersion? + suspend fun getVersionHash(branch: BranchReference): String? + suspend fun pollVersionHash(branch: BranchReference, lastKnown: String?): String + suspend fun mergeChanges(branch: BranchReference, newVersionHash: String): String + + /** + * Same as [mergeChanges] but blocking. + * Caller is expected to execute it outside the request thread. + */ + fun mergeChangesBlocking(branch: BranchReference, newVersionHash: String): String + suspend fun computeDelta(versionHash: String, baseVersionHash: String?): ObjectData +} + +fun IRepositoriesManager.getBranchNames(repositoryId: RepositoryId): Set { + return getBranches(repositoryId).map { it.branchName }.toSet() +} diff --git a/model-server/src/main/kotlin/org/modelix/model/server/handlers/KeyValueLikeModelServer.kt b/model-server/src/main/kotlin/org/modelix/model/server/handlers/KeyValueLikeModelServer.kt index 925c52aa52..99a0a07843 100644 --- a/model-server/src/main/kotlin/org/modelix/model/server/handlers/KeyValueLikeModelServer.kt +++ b/model-server/src/main/kotlin/org/modelix/model/server/handlers/KeyValueLikeModelServer.kt @@ -44,6 +44,7 @@ import org.modelix.authorization.checkPermission import org.modelix.authorization.getUserName import org.modelix.authorization.requiresPermission import org.modelix.authorization.toKeycloakScope +import org.modelix.model.InMemoryModels import org.modelix.model.lazy.BranchReference import org.modelix.model.lazy.RepositoryId import org.modelix.model.persistent.HashUtil @@ -67,7 +68,15 @@ private class NotFoundException(description: String?) : RuntimeException(descrip typealias CallContext = PipelineContext -class KeyValueLikeModelServer(val repositoriesManager: RepositoriesManager) { +class KeyValueLikeModelServer( + private val repositoriesManager: IRepositoriesManager, + private val storeClient: IStoreClient, + private val inMemoryModels: InMemoryModels, +) { + + constructor(repositoriesManager: RepositoriesManager) : + this(repositoriesManager, repositoriesManager.client.store, InMemoryModels()) + companion object { private val LOG = LoggerFactory.getLogger(KeyValueLikeModelServer::class.java) val HASH_PATTERN = Pattern.compile("[a-zA-Z0-9\\-_]{5}\\*[a-zA-Z0-9\\-_]{38}") @@ -75,8 +84,6 @@ class KeyValueLikeModelServer(val repositoriesManager: RepositoriesManager) { val HEALTH_KEY = PROTECTED_PREFIX + "health2" } - val storeClient: IStoreClient get() = repositoriesManager.client.store - fun init(application: Application) { // Functionally, it does not matter if the server ID // is created eagerly on startup or lazily on the first request, @@ -100,7 +107,7 @@ class KeyValueLikeModelServer(val repositoriesManager: RepositoriesManager) { ?.getBranchReference(System.getenv("MODELIX_SERVER_MODELQL_WARMUP_BRANCH")) if (branchRef != null) { val version = repositoriesManager.getVersion(branchRef) - if (repositoriesManager.inMemoryModels.getModel(version!!.getTree()).isActive) { + if (inMemoryModels.getModel(version!!.getTree()).isActive) { call.respondText( status = HttpStatusCode.ServiceUnavailable, text = "Waiting for version $version to be loaded into memory", @@ -346,7 +353,7 @@ class KeyValueLikeModelServer(val repositoriesManager: RepositoriesManager) { HashUtil.checkObjectHashes(hashedObjects) - repositoriesManager.client.store.runTransactionSuspendable { + storeClient.runTransactionSuspendable { storeClient.putAll(hashedObjects) storeClient.putAll(userDefinedEntries) for ((branch, value) in branchChanges) { diff --git a/model-server/src/main/kotlin/org/modelix/model/server/handlers/ModelReplicationServer.kt b/model-server/src/main/kotlin/org/modelix/model/server/handlers/ModelReplicationServer.kt index b04b551826..8d02ff460e 100644 --- a/model-server/src/main/kotlin/org/modelix/model/server/handlers/ModelReplicationServer.kt +++ b/model-server/src/main/kotlin/org/modelix/model/server/handlers/ModelReplicationServer.kt @@ -46,6 +46,7 @@ import kotlinx.serialization.encodeToString import kotlinx.serialization.json.Json import org.modelix.api.public.Paths import org.modelix.authorization.getUserName +import org.modelix.model.InMemoryModels import org.modelix.model.api.ITree import org.modelix.model.api.PBranch import org.modelix.model.api.TreePointer @@ -69,15 +70,20 @@ import org.slf4j.LoggerFactory * Implements the endpoints used by the 'model-client', but compared to KeyValueLikeModelServer also understands what * client sends. This allows more validations and more responsibilities on the server side. */ -class ModelReplicationServer(val repositoriesManager: RepositoriesManager) { - constructor(modelClient: LocalModelClient) : this(RepositoriesManager(modelClient)) +class ModelReplicationServer( + private val repositoriesManager: IRepositoriesManager, + private val modelClient: LocalModelClient, + private val inMemoryModels: InMemoryModels, +) { + constructor(repositoriesManager: RepositoriesManager) : + this(repositoriesManager, repositoriesManager.client, InMemoryModels()) + + constructor(modelClient: LocalModelClient) : this(RepositoriesManager(modelClient), modelClient, InMemoryModels()) constructor(storeClient: IStoreClient) : this(LocalModelClient(storeClient)) companion object { private val LOG = LoggerFactory.getLogger(ModelReplicationServer::class.java) } - - private val modelClient: LocalModelClient get() = repositoriesManager.client private val storeClient: IStoreClient get() = modelClient.store fun init(application: Application) { @@ -268,16 +274,16 @@ class ModelReplicationServer(val repositoriesManager: RepositoriesManager) { LOG.trace("Running query on {} @ {}", branchRef, version) val initialTree = version!!.getTree() val branch = OTBranch( - PBranch(initialTree, repositoriesManager.client.idGenerator), - repositoriesManager.client.idGenerator, - repositoriesManager.client.storeCache, + PBranch(initialTree, modelClient.idGenerator), + modelClient.idGenerator, + modelClient.storeCache, ) ModelQLServer.handleCall(call, { writeAccess -> if (writeAccess) { branch.getRootNode() to branch.getArea() } else { - val model = repositoriesManager.inMemoryModels.getModel(initialTree).await() + val model = inMemoryModels.getModel(initialTree).await() model.getNode(ITree.ROOT_ID) to model.getArea() } }, { @@ -286,7 +292,7 @@ class ModelReplicationServer(val repositoriesManager: RepositoriesManager) { val (ops, newTree) = branch.getPendingChanges() if (newTree != initialTree) { val newVersion = CLVersion.createRegularVersion( - id = repositoriesManager.client.idGenerator.generate(), + id = modelClient.idGenerator.generate(), author = getUserName(), tree = newTree as CLTree, baseVersion = version, @@ -299,7 +305,7 @@ class ModelReplicationServer(val repositoriesManager: RepositoriesManager) { post { val versionHash = call.parameters["versionHash"]!! - val version = CLVersion.loadFromHash(versionHash, repositoriesManager.client.storeCache) + val version = CLVersion.loadFromHash(versionHash, modelClient.storeCache) val initialTree = version.getTree() val branch = TreePointer(initialTree) ModelQLServer.handleCall(call, branch.getRootNode(), branch.getArea()) diff --git a/model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoriesManager.kt b/model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoriesManager.kt index f4c6ef48e5..64ad29fb04 100644 --- a/model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoriesManager.kt +++ b/model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoriesManager.kt @@ -25,7 +25,6 @@ import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import kotlinx.datetime.Clock import org.apache.commons.collections4.map.LRUMap -import org.modelix.model.InMemoryModels import org.modelix.model.ModelMigrations import org.modelix.model.VersionMerger import org.modelix.model.api.IBranch @@ -51,7 +50,7 @@ import org.slf4j.LoggerFactory import java.lang.ref.SoftReference import java.util.UUID -class RepositoriesManager(val client: LocalModelClient) { +class RepositoriesManager(val client: LocalModelClient) : IRepositoriesManager { init { fun migrateLegacyRepositoriesList(infoBranch: IBranch) { @@ -82,13 +81,6 @@ class RepositoriesManager(val client: LocalModelClient) { private val store: IStoreClient get() = client.store private val objectStore: IDeserializingKeyValueStore get() = client.storeCache - val inMemoryModels = InMemoryModels() - - fun dispose() { - // TODO find instance creations and add a dispose() call if needed. Whoever creates an instance is responsible - // for its lifecycle. - inMemoryModels.dispose() - } fun generateClientId(repositoryId: RepositoryId): Long { return client.store.generateId("$KEY_PREFIX:${repositoryId.id}:clientId") @@ -103,7 +95,7 @@ class RepositoriesManager(val client: LocalModelClient) { * If the server ID was created previously but is only stored under a legacy database key, * it also gets stored under the current and all legacy database keys. */ - suspend fun maybeInitAndGetSeverId(): String { + override suspend fun maybeInitAndGetSeverId(): String { return store.runTransactionSuspendable { var serverId = store[SERVER_ID_KEY] if (serverId == null) { @@ -118,7 +110,7 @@ class RepositoriesManager(val client: LocalModelClient) { } } - fun getRepositories(): Set { + override fun getRepositories(): Set { val repositoriesList = store[REPOSITORIES_LIST_KEY] val emptyRepositoriesList = repositoriesList.isNullOrBlank() return if (emptyRepositoriesList) { @@ -130,7 +122,7 @@ class RepositoriesManager(val client: LocalModelClient) { private fun repositoryExists(repositoryId: RepositoryId) = getRepositories().contains(repositoryId) - suspend fun createRepository(repositoryId: RepositoryId, userName: String?, useRoleIds: Boolean = true): CLVersion { + override suspend fun createRepository(repositoryId: RepositoryId, userName: String?, useRoleIds: Boolean): CLVersion { var initialVersion: CLVersion? = null store.runTransactionSuspendable { val masterBranch = repositoryId.getBranchReference() @@ -155,7 +147,7 @@ class RepositoriesManager(val client: LocalModelClient) { return store[branchListKey(repositoryId)]?.lines()?.toSet() ?: emptySet() } - fun getBranches(repositoryId: RepositoryId): Set { + override fun getBranches(repositoryId: RepositoryId): Set { return getBranchNames(repositoryId) .map { repositoryId.getBranchReference(it) } .sortedBy { it.branchName } @@ -188,7 +180,7 @@ class RepositoriesManager(val client: LocalModelClient) { } } - suspend fun removeRepository(repository: RepositoryId): Boolean { + override suspend fun removeRepository(repository: RepositoryId): Boolean { return store.runTransactionSuspendable { if (!repositoryExists(repository)) { return@runTransactionSuspendable false @@ -206,7 +198,7 @@ class RepositoriesManager(val client: LocalModelClient) { } } - suspend fun removeBranches(repository: RepositoryId, branchNames: Set) { + override suspend fun removeBranches(repository: RepositoryId, branchNames: Set) { return store.runTransactionSuspendable { removeBranchesBlocking(repository, branchNames) } @@ -216,7 +208,7 @@ class RepositoriesManager(val client: LocalModelClient) { * Same as [removeBranches] but blocking. * Caller is expected to execute it outside the request thread. */ - fun removeBranchesBlocking(repository: RepositoryId, branchNames: Set) { + override fun removeBranchesBlocking(repository: RepositoryId, branchNames: Set) { if (branchNames.isEmpty()) return store.runTransaction { val key = branchListKey(repository) @@ -229,7 +221,7 @@ class RepositoriesManager(val client: LocalModelClient) { } } - suspend fun mergeChanges(branch: BranchReference, newVersionHash: String): String { + override suspend fun mergeChanges(branch: BranchReference, newVersionHash: String): String { return store.runTransactionSuspendable { mergeChangesBlocking(branch, newVersionHash) } @@ -239,7 +231,7 @@ class RepositoriesManager(val client: LocalModelClient) { * Same as [mergeChanges] but blocking. * Caller is expected to execute it outside the request thread. */ - fun mergeChangesBlocking(branch: BranchReference, newVersionHash: String): String { + override fun mergeChangesBlocking(branch: BranchReference, newVersionHash: String): String { return store.runTransaction { val headHash = getVersionHashBlocking(branch) val mergedHash = if (headHash == null) { @@ -262,11 +254,11 @@ class RepositoriesManager(val client: LocalModelClient) { } } - suspend fun getVersion(branch: BranchReference): CLVersion? { + override suspend fun getVersion(branch: BranchReference): CLVersion? { return getVersionHash(branch)?.let { CLVersion.loadFromHash(it, client.storeCache) } } - suspend fun getVersionHash(branch: BranchReference): String? { + override suspend fun getVersionHash(branch: BranchReference): String? { return store.runTransactionSuspendable { getVersionHashBlocking(branch) } @@ -293,13 +285,13 @@ class RepositoriesManager(val client: LocalModelClient) { store.put(legacyBranchKey(branch), hash, false) } - suspend fun pollVersionHash(branch: BranchReference, lastKnown: String?): String { + override suspend fun pollVersionHash(branch: BranchReference, lastKnown: String?): String { return pollEntry(client.store, branchKey(branch), lastKnown) ?: throw IllegalStateException("No version found for branch '${branch.branchName}' in repository '${branch.repositoryId}'") } private val versionDeltaCache = VersionDeltaCache(client.storeCache) - suspend fun computeDelta(versionHash: String, baseVersionHash: String?): ObjectData { + override suspend fun computeDelta(versionHash: String, baseVersionHash: String?): ObjectData { if (versionHash == baseVersionHash) return ObjectData.empty if (baseVersionHash == null) { // no need to cache anything if there is no delta computation happening diff --git a/model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoryOverview.kt b/model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoryOverview.kt index d225d0c1ef..359f09ee94 100644 --- a/model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoryOverview.kt +++ b/model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoryOverview.kt @@ -25,7 +25,7 @@ import kotlinx.html.tr import org.modelix.api.html.Paths import org.modelix.model.server.templates.PageWithMenuBar -class RepositoryOverview(private val repoManager: RepositoriesManager) { +class RepositoryOverview(private val repoManager: IRepositoriesManager) { fun init(application: Application) { application.routing { diff --git a/model-server/src/test/kotlin/org/modelix/model/server/PullPerformanceTest.kt b/model-server/src/test/kotlin/org/modelix/model/server/PullPerformanceTest.kt index 31808f0348..c346329df4 100644 --- a/model-server/src/test/kotlin/org/modelix/model/server/PullPerformanceTest.kt +++ b/model-server/src/test/kotlin/org/modelix/model/server/PullPerformanceTest.kt @@ -20,6 +20,7 @@ import io.ktor.server.testing.ApplicationTestBuilder import io.ktor.server.testing.testApplication import kotlinx.coroutines.coroutineScope import org.modelix.authorization.installAuthentication +import org.modelix.model.InMemoryModels import org.modelix.model.api.IChildLink import org.modelix.model.api.IConceptReference import org.modelix.model.api.INode @@ -40,11 +41,12 @@ class PullPerformanceTest { private fun runTest(block: suspend ApplicationTestBuilder.(storeClientWithStatistics: StoreClientWithStatistics) -> Unit) = testApplication { val storeClientWithStatistics = StoreClientWithStatistics(InMemoryStoreClient()) val repositoriesManager = RepositoriesManager(LocalModelClient(storeClientWithStatistics)) + val inMemoryModels = InMemoryModels() application { installAuthentication(unitTestMode = true) installDefaultServerPlugins() - ModelReplicationServer(repositoriesManager).init(this) - KeyValueLikeModelServer(repositoriesManager).init(this) + ModelReplicationServer(repositoriesManager, LocalModelClient(storeClientWithStatistics), inMemoryModels).init(this) + KeyValueLikeModelServer(repositoriesManager, storeClientWithStatistics, inMemoryModels).init(this) } coroutineScope { diff --git a/model-server/src/test/kotlin/org/modelix/model/server/ReplicatedRepositoryTest.kt b/model-server/src/test/kotlin/org/modelix/model/server/ReplicatedRepositoryTest.kt index aa15e8116c..7630ee50f1 100644 --- a/model-server/src/test/kotlin/org/modelix/model/server/ReplicatedRepositoryTest.kt +++ b/model-server/src/test/kotlin/org/modelix/model/server/ReplicatedRepositoryTest.kt @@ -25,6 +25,7 @@ import kotlinx.coroutines.withTimeout import org.junit.jupiter.api.RepeatedTest import org.junit.jupiter.api.RepetitionInfo import org.modelix.authorization.installAuthentication +import org.modelix.model.InMemoryModels import org.modelix.model.ModelFacade import org.modelix.model.VersionMerger import org.modelix.model.api.IBranch @@ -68,9 +69,12 @@ class ReplicatedRepositoryTest { application { installAuthentication(unitTestMode = true) installDefaultServerPlugins() - val repositoriesManager = RepositoriesManager(LocalModelClient(InMemoryStoreClient())) - ModelReplicationServer(repositoriesManager).init(this) - KeyValueLikeModelServer(repositoriesManager).init(this) + val storeClient = InMemoryStoreClient() + val modelClient = LocalModelClient(storeClient) + val repositoriesManager = RepositoriesManager(modelClient) + val inMemoryModels = InMemoryModels() + ModelReplicationServer(repositoriesManager, modelClient, inMemoryModels).init(this) + KeyValueLikeModelServer(repositoriesManager, storeClient, inMemoryModels).init(this) } coroutineScope { diff --git a/model-server/src/test/kotlin/org/modelix/model/server/handlers/ModelReplicationServerTest.kt b/model-server/src/test/kotlin/org/modelix/model/server/handlers/ModelReplicationServerTest.kt index 8fae66d1e3..76a678e43e 100644 --- a/model-server/src/test/kotlin/org/modelix/model/server/handlers/ModelReplicationServerTest.kt +++ b/model-server/src/test/kotlin/org/modelix/model/server/handlers/ModelReplicationServerTest.kt @@ -23,6 +23,7 @@ import io.ktor.server.testing.testApplication import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.coroutineScope import org.modelix.authorization.installAuthentication +import org.modelix.model.InMemoryModels import org.modelix.model.api.IConceptReference import org.modelix.model.client2.ModelClientV2 import org.modelix.model.client2.readVersionDelta @@ -41,9 +42,12 @@ class ModelReplicationServerTest { application { installAuthentication(unitTestMode = true) installDefaultServerPlugins() - val repositoriesManager = RepositoriesManager(LocalModelClient(InMemoryStoreClient())) - ModelReplicationServer(repositoriesManager).init(this) - KeyValueLikeModelServer(repositoriesManager).init(this) + val storeClient = InMemoryStoreClient() + val modelClient = LocalModelClient(storeClient) + val repositoriesManager = RepositoriesManager(modelClient) + val inMemoryModels = InMemoryModels() + ModelReplicationServer(repositoriesManager, modelClient, inMemoryModels).init(this) + KeyValueLikeModelServer(repositoriesManager, storeClient, inMemoryModels).init(this) } coroutineScope { From 472bfa734f4d1300d5d0e4f7721a25e750808b8e Mon Sep 17 00:00:00 2001 From: Oleksandr Dzhychko Date: Tue, 26 Mar 2024 15:41:25 +0100 Subject: [PATCH 2/3] fix(model-server): respond with error if delta computation fails before creating a flow Before this fix, the server started responding with status code 200 before even trying to execute `repositoriesManager.computeDelta`. --- .../server/handlers/ModelReplicationServer.kt | 5 +- .../handlers/ModelReplicationServerTest.kt | 55 ++++++++++++++++--- 2 files changed, 52 insertions(+), 8 deletions(-) diff --git a/model-server/src/main/kotlin/org/modelix/model/server/handlers/ModelReplicationServer.kt b/model-server/src/main/kotlin/org/modelix/model/server/handlers/ModelReplicationServer.kt index 8d02ff460e..75dbc16207 100644 --- a/model-server/src/main/kotlin/org/modelix/model/server/handlers/ModelReplicationServer.kt +++ b/model-server/src/main/kotlin/org/modelix/model/server/handlers/ModelReplicationServer.kt @@ -379,8 +379,11 @@ class ModelReplicationServer( } private suspend fun ApplicationCall.respondDeltaAsObjectStream(versionHash: String, baseVersionHash: String?, plainText: Boolean) { + // Call `computeDelta` before starting to respond. + // It could already throw an exception, and in that case we do not want a successful response status. + val objectData = repositoriesManager.computeDelta(versionHash, baseVersionHash) respondTextWriter(contentType = if (plainText) ContentType.Text.Plain else VersionDeltaStream.CONTENT_TYPE) { - repositoriesManager.computeDelta(versionHash, baseVersionHash).asFlow() + objectData.asFlow() .flatten() .withSeparator("\n") .onEmpty { emit(versionHash) } diff --git a/model-server/src/test/kotlin/org/modelix/model/server/handlers/ModelReplicationServerTest.kt b/model-server/src/test/kotlin/org/modelix/model/server/handlers/ModelReplicationServerTest.kt index 76a678e43e..27465dbc52 100644 --- a/model-server/src/test/kotlin/org/modelix/model/server/handlers/ModelReplicationServerTest.kt +++ b/model-server/src/test/kotlin/org/modelix/model/server/handlers/ModelReplicationServerTest.kt @@ -16,6 +16,7 @@ package org.modelix.model.server.handlers import io.ktor.client.request.get +import io.ktor.http.HttpStatusCode import io.ktor.http.appendPathSegments import io.ktor.http.takeFrom import io.ktor.server.testing.ApplicationTestBuilder @@ -34,20 +35,26 @@ import org.modelix.model.server.installDefaultServerPlugins import org.modelix.model.server.store.InMemoryStoreClient import org.modelix.model.server.store.LocalModelClient import kotlin.test.Test +import kotlin.test.assertEquals import kotlin.test.fail class ModelReplicationServerTest { - private fun runTest(block: suspend ApplicationTestBuilder.(scope: CoroutineScope) -> Unit) = testApplication { + private fun getDefaultModelReplicationServer(): ModelReplicationServer { + val storeClient = InMemoryStoreClient() + val modelClient = LocalModelClient(storeClient) + val repositoriesManager = RepositoriesManager(modelClient) + return ModelReplicationServer(repositoriesManager, modelClient, InMemoryModels()) + } + + private fun runTest( + modelReplicationServer: ModelReplicationServer = getDefaultModelReplicationServer(), + block: suspend ApplicationTestBuilder.(scope: CoroutineScope) -> Unit, + ) = testApplication { application { installAuthentication(unitTestMode = true) installDefaultServerPlugins() - val storeClient = InMemoryStoreClient() - val modelClient = LocalModelClient(storeClient) - val repositoriesManager = RepositoriesManager(modelClient) - val inMemoryModels = InMemoryModels() - ModelReplicationServer(repositoriesManager, modelClient, inMemoryModels).init(this) - KeyValueLikeModelServer(repositoriesManager, storeClient, inMemoryModels).init(this) + modelReplicationServer.init(this) } coroutineScope { @@ -92,4 +99,38 @@ class ModelReplicationServerTest { } } } + + @Test + fun `server responds with error when failing to compute delta before starting to respond`() { + // Arrange + val storeClient = InMemoryStoreClient() + val modelClient = LocalModelClient(storeClient) + val repositoriesManager = RepositoriesManager(modelClient) + val faultyRepositoriesManager = object : + IRepositoriesManager by repositoriesManager { + override suspend fun computeDelta(versionHash: String, baseVersionHash: String?): ObjectData { + error("Unexpected error.") + } + } + val modelReplicationServer = ModelReplicationServer(faultyRepositoriesManager, modelClient, InMemoryModels()) + val url = "http://localhost/v2" + val repositoryId = RepositoryId("repo1") + val branchRef = repositoryId.getBranchReference() + + runTest(modelReplicationServer) { + repositoriesManager.createRepository(repositoryId, null) + + // Act + val response = client.get { + url { + takeFrom(url) + appendPathSegments("repositories", repositoryId.id, "branches", branchRef.branchName) + } + useVersionStreamFormat() + } + + // Assert + assertEquals(HttpStatusCode.InternalServerError, response.status) + } + } } From 8f02751998e88b8b5ed986887530ec4bef9d794d Mon Sep 17 00:00:00 2001 From: Oleksandr Dzhychko Date: Wed, 3 Apr 2024 17:50:57 +0200 Subject: [PATCH 3/3] chore(model-server): use non-blocking method to send version delta Use `respondBytesWriter` because it has a non-blocking `writeStringUtf8` method. `respondTextWriter` has only blocking methods to write data. Using a `java.io.Writer` is just an indirection without purpose. Using `respondBytesWriter` needed to take a bug with Ktor in consideration. For the expected behavior, a test was added. --- .../server/handlers/ModelReplicationServer.kt | 52 ++++++++++--- .../server/handlers/RepositoriesManager.kt | 2 + .../model/server/ModelServerTestUtil.kt | 34 ++++++++ .../handlers/ModelReplicationServerTest.kt | 78 ++++++++++++++++++- 4 files changed, 151 insertions(+), 15 deletions(-) diff --git a/model-server/src/main/kotlin/org/modelix/model/server/handlers/ModelReplicationServer.kt b/model-server/src/main/kotlin/org/modelix/model/server/handlers/ModelReplicationServer.kt index 75dbc16207..19e0a5f80d 100644 --- a/model-server/src/main/kotlin/org/modelix/model/server/handlers/ModelReplicationServer.kt +++ b/model-server/src/main/kotlin/org/modelix/model/server/handlers/ModelReplicationServer.kt @@ -27,13 +27,17 @@ import io.ktor.server.resources.get import io.ktor.server.resources.post import io.ktor.server.resources.put import io.ktor.server.response.respond +import io.ktor.server.response.respondBytesWriter import io.ktor.server.response.respondText -import io.ktor.server.response.respondTextWriter import io.ktor.server.routing.Route import io.ktor.server.routing.route import io.ktor.server.routing.routing import io.ktor.server.websocket.webSocket +import io.ktor.util.cio.use import io.ktor.util.pipeline.PipelineContext +import io.ktor.utils.io.ByteWriteChannel +import io.ktor.utils.io.close +import io.ktor.utils.io.writeStringUtf8 import io.ktor.websocket.send import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job @@ -84,6 +88,7 @@ class ModelReplicationServer( companion object { private val LOG = LoggerFactory.getLogger(ModelReplicationServer::class.java) } + private val storeClient: IStoreClient get() = modelClient.store fun init(application: Application) { @@ -378,24 +383,47 @@ class ModelReplicationServer( respond(delta) } - private suspend fun ApplicationCall.respondDeltaAsObjectStream(versionHash: String, baseVersionHash: String?, plainText: Boolean) { + private suspend fun ApplicationCall.respondDeltaAsObjectStream( + versionHash: String, + baseVersionHash: String?, + plainText: Boolean, + ) { // Call `computeDelta` before starting to respond. // It could already throw an exception, and in that case we do not want a successful response status. val objectData = repositoriesManager.computeDelta(versionHash, baseVersionHash) - respondTextWriter(contentType = if (plainText) ContentType.Text.Plain else VersionDeltaStream.CONTENT_TYPE) { - objectData.asFlow() - .flatten() - .withSeparator("\n") - .onEmpty { emit(versionHash) } - .withIndex() - .collect { - if (it.index == 0) check(it.value == versionHash) { "First object should be the version" } - append(it.value) - } + val contentType = if (plainText) ContentType.Text.Plain else VersionDeltaStream.CONTENT_TYPE + respondBytesWriter(contentType) { + this.useClosingWithoutCause { + objectData.asFlow() + .flatten() + .withSeparator("\n") + .onEmpty { emit(versionHash) } + .withIndex() + .collect { + if (it.index == 0) check(it.value == versionHash) { "First object should be the version" } + writeStringUtf8(it.value) + } + } } } } +/** + * Same as [[ByteWriteChannel.use]] but closing without a cause in case of an exception. + * + * Calling [[ByteWriteChannel.close]] with a cause results in not closing the connection properly. + * See ModelReplicationServerTest.`server closes connection when failing to compute delta after starting to respond` + * This will only be fixed in Ktor 3. + * See https://youtrack.jetbrains.com/issue/KTOR-4862/Ktor-hangs-if-exception-occurs-during-write-response-body + */ +private inline fun ByteWriteChannel.useClosingWithoutCause(block: ByteWriteChannel.() -> Unit) { + try { + block() + } finally { + close() + } +} + private fun Flow>.flatten() = flow { collect { emit(it.first) diff --git a/model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoriesManager.kt b/model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoriesManager.kt index 64ad29fb04..47314fe24c 100644 --- a/model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoriesManager.kt +++ b/model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoriesManager.kt @@ -50,6 +50,8 @@ import org.slf4j.LoggerFactory import java.lang.ref.SoftReference import java.util.UUID +// The methods in this class are almost cohesive, so the number of functions is fine. +@Suppress("complexity.TooManyFunctions") class RepositoriesManager(val client: LocalModelClient) : IRepositoriesManager { init { diff --git a/model-server/src/test/kotlin/org/modelix/model/server/ModelServerTestUtil.kt b/model-server/src/test/kotlin/org/modelix/model/server/ModelServerTestUtil.kt index 2163028703..e2b4c7b444 100644 --- a/model-server/src/test/kotlin/org/modelix/model/server/ModelServerTestUtil.kt +++ b/model-server/src/test/kotlin/org/modelix/model/server/ModelServerTestUtil.kt @@ -19,11 +19,15 @@ package org.modelix.model.server import io.ktor.serialization.kotlinx.json.json import io.ktor.server.application.Application import io.ktor.server.application.install +import io.ktor.server.netty.Netty +import io.ktor.server.netty.NettyApplicationEngine import io.ktor.server.plugins.contentnegotiation.ContentNegotiation import io.ktor.server.resources.Resources import io.ktor.server.routing.IgnoreTrailingSlash import io.ktor.server.testing.ApplicationTestBuilder import io.ktor.server.websocket.WebSockets +import kotlinx.coroutines.runBlocking +import org.modelix.authorization.installAuthentication import org.modelix.model.client2.ModelClientV2 suspend fun ApplicationTestBuilder.createModelClient(): ModelClientV2 { @@ -37,3 +41,33 @@ fun Application.installDefaultServerPlugins() { install(Resources) install(IgnoreTrailingSlash) } + +/** + * Allow running a model server for test with Netty. + * + * This allows testing properties that would not be testable with [io.ktor.server.testing.testApplication] + * because the requests run on proper request threads. + * + * Examples for such properties are + * (1) errors while writing responses and + * (2) effects of blocking code on request threads. + */ +fun runWithNettyServer( + setupBlock: (application: Application) -> Unit, + testBlock: suspend (server: NettyApplicationEngine) -> Unit, +) { + val nettyServer: NettyApplicationEngine = io.ktor.server.engine.embeddedServer(Netty, port = 0) { + installAuthentication(unitTestMode = true) + installDefaultServerPlugins() + setupBlock(this) + } + + try { + nettyServer.start(wait = false) + runBlocking { + testBlock(nettyServer) + } + } finally { + nettyServer.stop() + } +} diff --git a/model-server/src/test/kotlin/org/modelix/model/server/handlers/ModelReplicationServerTest.kt b/model-server/src/test/kotlin/org/modelix/model/server/handlers/ModelReplicationServerTest.kt index 27465dbc52..77345c5d3f 100644 --- a/model-server/src/test/kotlin/org/modelix/model/server/handlers/ModelReplicationServerTest.kt +++ b/model-server/src/test/kotlin/org/modelix/model/server/handlers/ModelReplicationServerTest.kt @@ -15,14 +15,25 @@ package org.modelix.model.server.handlers +import io.ktor.client.HttpClient +import io.ktor.client.engine.cio.CIO +import io.ktor.client.plugins.HttpTimeout +import io.ktor.client.plugins.defaultRequest import io.ktor.client.request.get import io.ktor.http.HttpStatusCode import io.ktor.http.appendPathSegments import io.ktor.http.takeFrom +import io.ktor.server.application.Application +import io.ktor.server.netty.NettyApplicationEngine import io.ktor.server.testing.ApplicationTestBuilder import io.ktor.server.testing.testApplication import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.flow.channelFlow +import kotlinx.coroutines.flow.emitAll +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.test.runTest +import kotlinx.coroutines.withTimeout import org.modelix.authorization.installAuthentication import org.modelix.model.InMemoryModels import org.modelix.model.api.IConceptReference @@ -32,11 +43,13 @@ import org.modelix.model.client2.runWrite import org.modelix.model.client2.useVersionStreamFormat import org.modelix.model.lazy.RepositoryId import org.modelix.model.server.installDefaultServerPlugins +import org.modelix.model.server.runWithNettyServer import org.modelix.model.server.store.InMemoryStoreClient import org.modelix.model.server.store.LocalModelClient import kotlin.test.Test import kotlin.test.assertEquals import kotlin.test.fail +import kotlin.time.Duration.Companion.seconds class ModelReplicationServerTest { @@ -47,7 +60,7 @@ class ModelReplicationServerTest { return ModelReplicationServer(repositoriesManager, modelClient, InMemoryModels()) } - private fun runTest( + private fun runWithTestModelServer( modelReplicationServer: ModelReplicationServer = getDefaultModelReplicationServer(), block: suspend ApplicationTestBuilder.(scope: CoroutineScope) -> Unit, ) = testApplication { @@ -63,7 +76,7 @@ class ModelReplicationServerTest { } @Test - fun `pulling delta does not return objects twice`() = runTest { + fun `pulling delta does not return objects twice`() = runWithTestModelServer { // Arrange val url = "http://localhost/v2" val modelClient = ModelClientV2.builder().url(url).client(client).build().also { it.init() } @@ -117,7 +130,7 @@ class ModelReplicationServerTest { val repositoryId = RepositoryId("repo1") val branchRef = repositoryId.getBranchReference() - runTest(modelReplicationServer) { + runWithTestModelServer(modelReplicationServer) { repositoriesManager.createRepository(repositoryId, null) // Act @@ -133,4 +146,63 @@ class ModelReplicationServerTest { assertEquals(HttpStatusCode.InternalServerError, response.status) } } + + @Test + fun `server closes connection when failing to compute delta after starting to respond`() = runTest { + // Arrange + val repositoryId = RepositoryId("repo1") + val branchRef = repositoryId.getBranchReference() + val storeClient = InMemoryStoreClient() + val modelClient = LocalModelClient(storeClient) + val repositoriesManager = RepositoriesManager(modelClient) + val faultyRepositoriesManager = object : + IRepositoriesManager by repositoriesManager { + override suspend fun computeDelta(versionHash: String, baseVersionHash: String?): ObjectData { + val originalFlow = repositoriesManager.computeDelta(versionHash, baseVersionHash).asFlow() + val brokenFlow = channelFlow> { + error("Unexpected error.") + } + return ObjectDataFlow( + flow { + emitAll(originalFlow) + emitAll(brokenFlow) + }, + ) + } + } + repositoriesManager.createRepository(repositoryId, null) + + suspend fun createClient(server: NettyApplicationEngine): HttpClient { + val port = server.resolvedConnectors().first().port + return HttpClient(CIO) { + defaultRequest { + url("http://localhost:$port") + } + install(HttpTimeout) { + requestTimeoutMillis = 5_000 + } + } + } + + val modelReplicationServer = ModelReplicationServer(faultyRepositoriesManager, modelClient, InMemoryModels()) + val setupBlock = { application: Application -> modelReplicationServer.init(application) } + val testBlock: suspend (server: NettyApplicationEngine) -> Unit = { server -> + withTimeout(10.seconds) { + val client = createClient(server) + // Act + val response = client.get { + url { + takeFrom(url) + appendPathSegments("v2", "repositories", repositoryId.id, "branches", branchRef.branchName) + } + useVersionStreamFormat() + } + + // Assert + // The response should be delivered even if an exception is thrown inside the flow. + assertEquals(HttpStatusCode.OK, response.status) + } + } + runWithNettyServer(setupBlock, testBlock) + } }