From 0bd26e31a3b62ccf94dafcab5bd110aff397eb39 Mon Sep 17 00:00:00 2001 From: slisson Date: Sun, 8 Dec 2024 19:25:56 +0100 Subject: [PATCH] fix(model-server): deadlock caused by non-existing lock ordering --- .../model/server/handlers/HealthApiImpl.kt | 8 +- .../server/handlers/IRepositoriesManager.kt | 24 +- .../handlers/KeyValueLikeModelServer.kt | 54 +++-- .../server/handlers/ModelReplicationServer.kt | 74 +++--- .../server/handlers/RepositoriesManager.kt | 205 ++++++++--------- .../model/server/store/IGenericStoreClient.kt | 9 +- .../model/server/store/IImmutableStore.kt | 73 ++++++ .../model/server/store/IStoreClient.kt | 8 +- .../model/server/store/IgniteStoreClient.kt | 124 ++++++++-- .../model/server/store/InMemoryStoreClient.kt | 134 +++++++---- .../model/server/store/StoreClientAdapter.kt | 38 +++- .../model/server/store/StoreManager.kt | 22 +- .../model/server/store/TransactionLocks.kt | 77 +++++++ .../model/server/RestModelServerTest.kt | 21 +- ...niteStoreClientParallelTransactionsTest.kt | 54 ++++- .../modelix/model/server/ModelClientTest.kt | 15 +- .../modelix/model/server/ModelClientV2Test.kt | 17 +- .../modelix/model/server/StoreClientTest.kt | 32 +-- .../model/server/StoreClientWithStatistics.kt | 4 +- .../model/server/TransactionLocksTest.kt | 212 ++++++++++++++++++ .../handlers/ModelReplicationServerTest.kt | 37 ++- .../handlers/RepositoriesManagerTest.kt | 39 ++-- .../IgnitePostgresRepositoryRemovalTest.kt | 20 +- .../model/server/store/IgnitePostgresTest.kt | 126 ++++++----- .../store/InMemoryIsolatingStoreTest.kt | 20 +- .../modelix/streams/StreamExtensionsTests.kt | 33 +++ 26 files changed, 1090 insertions(+), 390 deletions(-) create mode 100644 model-server/src/main/kotlin/org/modelix/model/server/store/IImmutableStore.kt create mode 100644 model-server/src/main/kotlin/org/modelix/model/server/store/TransactionLocks.kt create mode 100644 model-server/src/test/kotlin/org/modelix/model/server/TransactionLocksTest.kt create mode 100644 streams/src/commonTest/kotlin/org/modelix/streams/StreamExtensionsTests.kt diff --git a/model-server/src/main/kotlin/org/modelix/model/server/handlers/HealthApiImpl.kt b/model-server/src/main/kotlin/org/modelix/model/server/handlers/HealthApiImpl.kt index 146564340f..0e7ee44afb 100644 --- a/model-server/src/main/kotlin/org/modelix/model/server/handlers/HealthApiImpl.kt +++ b/model-server/src/main/kotlin/org/modelix/model/server/handlers/HealthApiImpl.kt @@ -25,9 +25,11 @@ class HealthApiImpl( private fun isHealthy(): Boolean { val store = stores.getGlobalStoreClient() - val value = toLong(store[HEALTH_KEY]) + 1 - store.put(HEALTH_KEY, java.lang.Long.toString(value)) - return toLong(store[HEALTH_KEY]) >= value + return store.getTransactionManager().runWrite { + val value = toLong(store[HEALTH_KEY]) + 1 + store.put(HEALTH_KEY, java.lang.Long.toString(value)) + toLong(store[HEALTH_KEY]) >= value + } } private fun toLong(value: String?): Long { diff --git a/model-server/src/main/kotlin/org/modelix/model/server/handlers/IRepositoriesManager.kt b/model-server/src/main/kotlin/org/modelix/model/server/handlers/IRepositoriesManager.kt index 3d691bad6c..9ef7ed81f9 100644 --- a/model-server/src/main/kotlin/org/modelix/model/server/handlers/IRepositoriesManager.kt +++ b/model-server/src/main/kotlin/org/modelix/model/server/handlers/IRepositoriesManager.kt @@ -6,6 +6,7 @@ import org.modelix.model.lazy.CLVersion import org.modelix.model.lazy.IDeserializingKeyValueStore import org.modelix.model.lazy.RepositoryId import org.modelix.model.server.store.IStoreClient +import org.modelix.model.server.store.ITransactionManager import org.modelix.model.server.store.StoreManager interface IRepositoriesManager { @@ -18,25 +19,23 @@ interface IRepositoriesManager { * If the server ID was created previously but is only stored under a legacy database key, * it also gets stored under the current and all legacy database keys. */ - suspend fun maybeInitAndGetSeverId(): String + fun maybeInitAndGetSeverId(): String fun getRepositories(): Set - suspend fun createRepository(repositoryId: RepositoryId, userName: String?, useRoleIds: Boolean = true, legacyGlobalStorage: Boolean = false): CLVersion - suspend fun removeRepository(repository: RepositoryId): Boolean + fun createRepository(repositoryId: RepositoryId, userName: String?, useRoleIds: Boolean = true, legacyGlobalStorage: Boolean = false): CLVersion + fun removeRepository(repository: RepositoryId): Boolean fun getBranches(repositoryId: RepositoryId): Set - suspend fun removeBranches(repository: RepositoryId, branchNames: Set) - /** * Same as [removeBranches] but blocking. * Caller is expected to execute it outside the request thread. */ - fun removeBranchesBlocking(repository: RepositoryId, branchNames: Set) - suspend fun getVersion(branch: BranchReference): CLVersion? - suspend fun getVersion(repository: RepositoryId, versionHash: String): CLVersion? - suspend fun getVersionHash(branch: BranchReference): String? + fun removeBranches(repository: RepositoryId, branchNames: Set) + fun getVersion(branch: BranchReference): CLVersion? + fun getVersion(repository: RepositoryId, versionHash: String): CLVersion? + fun getVersionHash(branch: BranchReference): String? suspend fun pollVersionHash(branch: BranchReference, lastKnown: String?): String - suspend fun mergeChanges(branch: BranchReference, newVersionHash: String): String + fun mergeChanges(branch: BranchReference, newVersionHash: String): String /** * Same as [mergeChanges] but blocking. @@ -54,14 +53,15 @@ interface IRepositoriesManager { fun isIsolated(repository: RepositoryId): Boolean? fun getStoreManager(): StoreManager + fun getTransactionManager(): ITransactionManager } fun IRepositoriesManager.getBranchNames(repositoryId: RepositoryId): Set { return getBranches(repositoryId).map { it.branchName }.toSet() } -fun IRepositoriesManager.getStoreClient(repository: RepositoryId?): IStoreClient { - return getStoreManager().getStoreClient(repository?.takeIf { isIsolated(it) ?: false }) +fun IRepositoriesManager.getStoreClient(repository: RepositoryId?, immutable: Boolean): IStoreClient { + return getStoreManager().getStoreClient(repository?.takeIf { isIsolated(it) ?: false }, immutable) } fun IRepositoriesManager.getAsyncStore(repository: RepositoryId?): IAsyncObjectStore { diff --git a/model-server/src/main/kotlin/org/modelix/model/server/handlers/KeyValueLikeModelServer.kt b/model-server/src/main/kotlin/org/modelix/model/server/handlers/KeyValueLikeModelServer.kt index 931b31ed88..be6df203fa 100644 --- a/model-server/src/main/kotlin/org/modelix/model/server/handlers/KeyValueLikeModelServer.kt +++ b/model-server/src/main/kotlin/org/modelix/model/server/handlers/KeyValueLikeModelServer.kt @@ -13,7 +13,6 @@ import io.ktor.server.resources.put import io.ktor.server.response.respondText import io.ktor.server.routing.routing import io.ktor.util.pipeline.PipelineContext -import kotlinx.coroutines.runBlocking import kotlinx.html.br import kotlinx.html.div import kotlinx.html.h1 @@ -34,7 +33,8 @@ import org.modelix.model.server.ModelServerPermissionSchema import org.modelix.model.server.store.ObjectInRepository import org.modelix.model.server.store.StoreManager import org.modelix.model.server.store.pollEntry -import org.modelix.model.server.store.runTransactionSuspendable +import org.modelix.model.server.store.runReadIO +import org.modelix.model.server.store.runWriteIO import org.modelix.model.server.templates.PageWithMenuBar import java.io.IOException import java.util.* @@ -67,7 +67,7 @@ class KeyValueLikeModelServer( // request to initialize it lazily, would make the code less robust. // Each change in the logic of RepositoriesManager#maybeInitAndGetSeverId would need // the special conditions in the affected requests to be updated. - runBlocking { repositoriesManager.maybeInitAndGetSeverId() } + repositoriesManager.getTransactionManager().runWrite { repositoriesManager.maybeInitAndGetSeverId() } application.apply { modelServerModule() } @@ -95,7 +95,7 @@ class KeyValueLikeModelServer( get { val key = call.parameters["key"]!! checkKeyPermission(key, EPermissionType.READ) - val value = stores.getGlobalKeyValueStore()[key] + val value = stores.getTransactionManager().runRead { stores.getGlobalStoreClient()[key] } respondValue(key, value) } get { @@ -112,21 +112,23 @@ class KeyValueLikeModelServer( post { val key = call.parameters["key"]!! checkKeyPermission(key, EPermissionType.WRITE) - val value = stores.getGlobalStoreClient().generateId(key) + val value = stores.getGlobalStoreClient(false).generateId(key) call.respondText(text = value.toString()) } get { val key = call.parameters["key"]!! checkKeyPermission(key, EPermissionType.READ) - call.respondText(collect(key, this).toString(2), contentType = ContentType.Application.Json) + call.respondText(runRead { collect(key, this) }.toString(2), contentType = ContentType.Application.Json) } put { val key = call.parameters["key"]!! val value = call.receiveText() try { - putEntries(mapOf(key to value)) + stores.getTransactionManager().runWriteIO { + putEntries(mapOf(key to value)) + } call.respondText("OK") } catch (e: NotFoundException) { throw HttpException(HttpStatusCode.NotFound, title = "Not found", details = e.message, cause = e) @@ -145,7 +147,9 @@ class KeyValueLikeModelServer( } entries = sortByDependency(entries) try { - putEntries(entries) + stores.getTransactionManager().runWriteIO { + putEntries(entries) + } call.respondText(entries.size.toString() + " entries written") } catch (e: NotFoundException) { throw HttpException(HttpStatusCode.NotFound, title = "Not found", details = e.message, cause = e) @@ -164,7 +168,7 @@ class KeyValueLikeModelServer( checkKeyPermission(key, EPermissionType.READ) keys.add(key) } - val values = stores.getGlobalStoreClient().getAll(keys) + val values = runRead { stores.getGlobalStoreClient(false).getAll(keys) } for (i in keys.indices) { val respEntry = JSONObject() respEntry.put("key", keys[i]) @@ -216,7 +220,7 @@ class KeyValueLikeModelServer( if (callContext != null) { keys.forEach { callContext.checkKeyPermission(it, EPermissionType.READ) } } - val values = stores.getGlobalStoreClient().getAll(keys) + val values = stores.getGlobalStoreClient(false).getAll(keys) for (i in keys.indices) { val key = keys[i] val value = values[i] @@ -246,7 +250,7 @@ class KeyValueLikeModelServer( return result } - private suspend fun CallContext.putEntries(newEntries: Map) { + private fun CallContext.putEntries(newEntries: Map) { val referencedKeys: MutableSet = HashSet() for ((key, value) in newEntries) { checkKeyPermission(key, EPermissionType.WRITE) @@ -303,17 +307,15 @@ class KeyValueLikeModelServer( // We could try to move the objects later, but since this API is deprecated, it's not worth the effort. } - stores.getGlobalStoreClient().runTransactionSuspendable { - stores.genericStore.putAll(hashedObjects.mapKeys { ObjectInRepository.global(it.key) }) - stores.genericStore.putAll(userDefinedEntries.mapKeys { ObjectInRepository.global(it.key) }) - for ((branch, value) in branchChanges) { - if (value == null) { - checkPermission(ModelServerPermissionSchema.branch(branch).delete) - repositoriesManager.removeBranchesBlocking(branch.repositoryId, setOf(branch.branchName)) - } else { - checkPermission(ModelServerPermissionSchema.branch(branch).push) - repositoriesManager.mergeChangesBlocking(branch, value) - } + stores.genericStore.putAll(hashedObjects.mapKeys { ObjectInRepository.global(it.key) }) + stores.genericStore.putAll(userDefinedEntries.mapKeys { ObjectInRepository.global(it.key) }) + for ((branch, value) in branchChanges) { + if (value == null) { + checkPermission(ModelServerPermissionSchema.branch(branch).delete) + repositoriesManager.removeBranches(branch.repositoryId, setOf(branch.branchName)) + } else { + checkPermission(ModelServerPermissionSchema.branch(branch).push) + repositoriesManager.mergeChangesBlocking(branch, value) } } } @@ -363,4 +365,12 @@ class KeyValueLikeModelServer( else -> unknown() } } + + private suspend fun runRead(body: () -> R): R { + return repositoriesManager.getTransactionManager().runReadIO(body) + } + + private suspend fun runWrite(body: () -> R): R { + return repositoriesManager.getTransactionManager().runWriteIO(body) + } } diff --git a/model-server/src/main/kotlin/org/modelix/model/server/handlers/ModelReplicationServer.kt b/model-server/src/main/kotlin/org/modelix/model/server/handlers/ModelReplicationServer.kt index cf53895abd..c45b055105 100644 --- a/model-server/src/main/kotlin/org/modelix/model/server/handlers/ModelReplicationServer.kt +++ b/model-server/src/main/kotlin/org/modelix/model/server/handlers/ModelReplicationServer.kt @@ -49,6 +49,8 @@ import org.modelix.model.server.api.v2.VersionDelta import org.modelix.model.server.api.v2.VersionDeltaStream import org.modelix.model.server.api.v2.VersionDeltaStreamV2 import org.modelix.model.server.store.StoreManager +import org.modelix.model.server.store.runReadIO +import org.modelix.model.server.store.runWriteIO import org.modelix.modelql.core.IMemoizationPersistence import org.modelix.modelql.core.IStepOutput import org.modelix.modelql.core.MonoUnboundQuery @@ -97,10 +99,12 @@ class ModelReplicationServer( override suspend fun PipelineContext.getRepositoryBranches(repository: String) { call.respondText( - repositoriesManager - .getBranchNames(repositoryId(repository)) - .filter { call.hasPermission(ModelServerPermissionSchema.repository(repository).branch(it).list) } - .joinToString("\n"), + runRead { + repositoriesManager + .getBranchNames(repositoryId(repository)) + .filter { call.hasPermission(ModelServerPermissionSchema.repository(repository).branch(it).list) } + .joinToString("\n") + }, ) } @@ -111,7 +115,9 @@ class ModelReplicationServer( ) { checkPermission(ModelServerPermissionSchema.repository(repository).branch(branch).pull) val branchRef = repositoryId(repository).getBranchReference(branch) - val versionHash = repositoriesManager.getVersionHash(branchRef) ?: throw BranchNotFoundException(branchRef) + val versionHash = runRead { + repositoriesManager.getVersionHash(branchRef) ?: throw BranchNotFoundException(branchRef) + } call.respondDelta(RepositoryId(repository), versionHash, lastKnown) } @@ -122,7 +128,7 @@ class ModelReplicationServer( ) { checkPermission(ModelServerPermissionSchema.repository(repository).branch(branch).pull) val branchRef = repositoryId(repository).getBranchReference(branch) - val versionHash = repositoriesManager.getVersionHash(branchRef) ?: throw BranchNotFoundException(branchRef) + val versionHash = runRead { repositoriesManager.getVersionHash(branchRef) ?: throw BranchNotFoundException(branchRef) } call.respond(BranchV1(branch, versionHash)) } @@ -138,11 +144,13 @@ class ModelReplicationServer( checkPermission(ModelServerPermissionSchema.repository(repositoryId).branch(branch).delete) - if (!repositoriesManager.getBranchNames(repositoryId).contains(branch)) { - throw BranchNotFoundException(branch, repositoryId.id) - } + runWrite { + if (!repositoriesManager.getBranchNames(repositoryId).contains(branch)) { + throw BranchNotFoundException(branch, repositoryId.id) + } - repositoriesManager.removeBranches(repositoryId, setOf(branch)) + repositoriesManager.removeBranches(repositoryId, setOf(branch)) + } call.respond(HttpStatusCode.NoContent) } @@ -153,7 +161,7 @@ class ModelReplicationServer( ) { checkPermission(ModelServerPermissionSchema.repository(repository).branch(branch).pull) val branchRef = repositoryId(repository).getBranchReference(branch) - val versionHash = repositoriesManager.getVersionHash(branchRef) ?: throw BranchNotFoundException(branchRef) + val versionHash = runRead { repositoriesManager.getVersionHash(branchRef) ?: throw BranchNotFoundException(branchRef) } call.respondText(versionHash) } @@ -163,19 +171,23 @@ class ModelReplicationServer( legacyGlobalStorage: Boolean?, ) { checkPermission(ModelServerPermissionSchema.repository(repository).create) - val initialVersion = repositoriesManager.createRepository( - repositoryId(repository), - call.getUserName(), - useRoleIds ?: true, - legacyGlobalStorage ?: false, - ) + val initialVersion = runWrite { + repositoriesManager.createRepository( + repositoryId(repository), + call.getUserName(), + useRoleIds ?: true, + legacyGlobalStorage ?: false, + ) + } call.respondDelta(RepositoryId(repository), initialVersion.getContentHash(), null) } override suspend fun PipelineContext.deleteRepository(repository: String) { checkPermission(ModelServerPermissionSchema.repository(repository).delete) - val foundAndDeleted = repositoriesManager.removeRepository(repositoryId(repository)) + val foundAndDeleted = runWrite { + repositoriesManager.removeRepository(repositoryId(repository)) + } if (foundAndDeleted) { call.respond(HttpStatusCode.NoContent) } else { @@ -191,8 +203,10 @@ class ModelReplicationServer( val branchRef = repositoryId(repository).getBranchReference(branch) val deltaFromClient = call.receive() deltaFromClient.checkObjectHashes() - repositoriesManager.getStoreClient(RepositoryId(repository)).putAll(deltaFromClient.getAllObjects()) - val mergedHash = repositoriesManager.mergeChanges(branchRef, deltaFromClient.versionHash) + repositoriesManager.getStoreClient(RepositoryId(repository), true).putAll(deltaFromClient.getAllObjects()) + val mergedHash = runWrite { + repositoriesManager.mergeChanges(branchRef, deltaFromClient.versionHash) + } call.respondDelta(RepositoryId(repository), mergedHash, deltaFromClient.versionHash) } @@ -217,7 +231,7 @@ class ModelReplicationServer( } val objects = withContext(Dispatchers.IO) { - repositoriesManager.getStoreClient(RepositoryId(repository)).getAll(keys) + repositoriesManager.getStoreClient(RepositoryId(repository), true).getAll(keys) } for (entry in objects) { @@ -260,7 +274,7 @@ class ModelReplicationServer( ) { val branchRef = repositoryId(repository).getBranchReference(branchName) checkPermission(ModelServerPermissionSchema.branch(branchRef).query) - val version = repositoriesManager.getVersion(branchRef) ?: throw BranchNotFoundException(branchRef) + val version = runRead { repositoriesManager.getVersion(branchRef) ?: throw BranchNotFoundException(branchRef) } LOG.trace("Running query on {} @ {}", branchRef, version) val initialTree = version.getTree() @@ -290,7 +304,9 @@ class ModelReplicationServer( baseVersion = version, operations = ops.map { it.getOriginalOp() }.toTypedArray(), ) - repositoriesManager.mergeChanges(branchRef, newVersion.getContentHash()) + runWrite { + repositoriesManager.mergeChanges(branchRef, newVersion.getContentHash()) + } } } }) @@ -330,7 +346,7 @@ class ModelReplicationServer( } withContext(Dispatchers.IO) { - repositoriesManager.getStoreClient(RepositoryId(repository)).putAll(entries, true) + repositoriesManager.getStoreClient(RepositoryId(repository), true).putAll(entries, true) } call.respondText("${entries.size} objects received") } @@ -341,7 +357,7 @@ class ModelReplicationServer( lastKnown: String?, ) { checkPermission(ModelServerPermissionSchema.legacyGlobalObjects.read) - if (stores.getGlobalStoreClient()[versionHash] == null) { + if (runRead { stores.getGlobalStoreClient()[versionHash] } == null) { throw VersionNotFoundException(versionHash) } call.respondDelta(null, versionHash, lastKnown) @@ -402,6 +418,14 @@ class ModelReplicationServer( } } } + + private suspend fun runRead(body: () -> R): R { + return repositoriesManager.getTransactionManager().runReadIO(body) + } + + private suspend fun runWrite(body: () -> R): R { + return repositoriesManager.getTransactionManager().runWriteIO(body) + } } /** diff --git a/model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoriesManager.kt b/model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoriesManager.kt index 8f588ec5d2..87b138cd0a 100644 --- a/model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoriesManager.kt +++ b/model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoriesManager.kt @@ -32,11 +32,12 @@ import org.modelix.model.persistent.CPVersion import org.modelix.model.persistent.HashUtil import org.modelix.model.persistent.SerializationUtil import org.modelix.model.server.api.v2.toMap -import org.modelix.model.server.store.IsolatingStore +import org.modelix.model.server.store.IRepositoryAwareStore +import org.modelix.model.server.store.ITransactionManager import org.modelix.model.server.store.ObjectInRepository import org.modelix.model.server.store.StoreManager +import org.modelix.model.server.store.assertWrite import org.modelix.model.server.store.pollEntry -import org.modelix.model.server.store.runTransactionSuspendable import org.modelix.streams.endOfSynchronousPipeline import org.slf4j.LoggerFactory import java.lang.ref.SoftReference @@ -45,7 +46,7 @@ import java.util.UUID // The methods in this class are almost cohesive, so the number of functions is fine. @Suppress("complexity.TooManyFunctions") class RepositoriesManager(val stores: StoreManager) : IRepositoriesManager { - constructor(store: IsolatingStore) : this(StoreManager(store)) + constructor(store: IRepositoryAwareStore) : this(StoreManager(store)) init { fun migrateLegacyRepositoriesList(infoBranch: IBranch) { @@ -61,10 +62,10 @@ class RepositoriesManager(val stores: StoreManager) : IRepositoriesManager { } fun doMigrations() { - stores.genericStore.runTransaction { + stores.getTransactionManager().runWrite { val repositoryId = RepositoryId("info") val v1BranchKey = repositoryId.getBranchReference().getKey() - val infoVersionHash = stores.getGlobalKeyValueStore()[v1BranchKey] ?: return@runTransaction + val infoVersionHash = stores.getGlobalStoreClient().get(v1BranchKey) ?: return@runWrite val infoVersion = CLVersion(infoVersionHash, getLegacyObjectStore(repositoryId)) val infoBranch: IBranch = PBranch(infoVersion.getTree(), IdGeneratorDummy()) @@ -78,6 +79,10 @@ class RepositoriesManager(val stores: StoreManager) : IRepositoriesManager { override fun getStoreManager(): StoreManager = stores + override fun getTransactionManager(): ITransactionManager { + return stores.getTransactionManager() + } + fun generateClientId(repositoryId: RepositoryId): Long { return stores.getGlobalStoreClient().generateId("$KEY_PREFIX:${repositoryId.id}:clientId") } @@ -91,20 +96,18 @@ class RepositoriesManager(val stores: StoreManager) : IRepositoriesManager { * If the server ID was created previously but is only stored under a legacy database key, * it also gets stored under the current and all legacy database keys. */ - override suspend fun maybeInitAndGetSeverId(): String { + override fun maybeInitAndGetSeverId(): String { val store = stores.getGlobalStoreClient() - return stores.runTransactionSuspendable { - var serverId = store[SERVER_ID_KEY] - if (serverId == null) { - serverId = store[LEGACY_SERVER_ID_KEY2] - ?: store[LEGACY_SERVER_ID_KEY] - ?: UUID.randomUUID().toString().replace("[^a-zA-Z0-9]".toRegex(), "") - store.put(SERVER_ID_KEY, serverId) - store.put(LEGACY_SERVER_ID_KEY, serverId) - store.put(LEGACY_SERVER_ID_KEY2, serverId) - } - serverId + var serverId = store[SERVER_ID_KEY] + if (serverId == null) { + serverId = store[LEGACY_SERVER_ID_KEY2] + ?: store[LEGACY_SERVER_ID_KEY] + ?: UUID.randomUUID().toString().replace("[^a-zA-Z0-9]".toRegex(), "") + store.put(SERVER_ID_KEY, serverId) + store.put(LEGACY_SERVER_ID_KEY, serverId) + store.put(LEGACY_SERVER_ID_KEY2, serverId) } + return serverId } override fun getRepositories(): Set { @@ -112,12 +115,14 @@ class RepositoriesManager(val stores: StoreManager) : IRepositoriesManager { } fun getRepositories(isolated: Boolean): Set { - val repositoriesList = stores.genericStore[ObjectInRepository.global(repositoriesListKey(isolated))] - val emptyRepositoriesList = repositoriesList.isNullOrBlank() - return if (emptyRepositoriesList) { - emptySet() - } else { - repositoriesList!!.lines().map { RepositoryId(it) }.toSet() + return getTransactionManager().runRead { + val repositoriesList = stores.genericStore[ObjectInRepository.global(repositoriesListKey(isolated))] + val emptyRepositoriesList = repositoriesList.isNullOrBlank() + if (emptyRepositoriesList) { + emptySet() + } else { + repositoriesList.lines().map { RepositoryId(it) }.toSet() + } } } @@ -135,35 +140,34 @@ class RepositoriesManager(val stores: StoreManager) : IRepositoriesManager { private fun repositoryExists(repositoryId: RepositoryId) = getRepositories().contains(repositoryId) - override suspend fun createRepository( + override fun createRepository( repositoryId: RepositoryId, userName: String?, useRoleIds: Boolean, legacyGlobalStorage: Boolean, ): CLVersion { + getTransactionManager().assertWrite() val isolated = !legacyGlobalStorage val globalStore = stores.getGlobalStoreClient() - return stores.runTransactionSuspendable { - val masterBranch = repositoryId.getBranchReference() - if (repositoryExists(repositoryId)) throw RepositoryAlreadyExistsException(repositoryId.id) - val existingRepositories = getRepositories(isolated) - globalStore.put( - repositoriesListKey(isolated), - (existingRepositories + repositoryId).joinToString("\n") { it.id }, - false, - ) - stores.genericStore.put(branchListKey(repositoryId, isolated), masterBranch.branchName, false) - val initialVersion = CLVersion.createRegularVersion( - id = stores.idGenerator.generate(), - time = Clock.System.now().epochSeconds.toString(), - author = userName, - tree = CLTree(null, null, stores.getLegacyObjectStore(repositoryId.takeIf { isolated }), useRoleIds = useRoleIds), - baseVersion = null, - operations = emptyArray(), - ) - putVersionHash(masterBranch, initialVersion.getContentHash()) - initialVersion - } + val masterBranch = repositoryId.getBranchReference() + if (repositoryExists(repositoryId)) throw RepositoryAlreadyExistsException(repositoryId.id) + val existingRepositories = getRepositories(isolated) + globalStore.put( + repositoriesListKey(isolated), + (existingRepositories + repositoryId).joinToString("\n") { it.id }, + false, + ) + stores.genericStore.put(branchListKey(repositoryId, isolated), masterBranch.branchName, false) + val initialVersion = CLVersion.createRegularVersion( + id = stores.idGenerator.generate(), + time = Clock.System.now().epochSeconds.toString(), + author = userName, + tree = CLTree(null, null, stores.getLegacyObjectStore(repositoryId.takeIf { isolated }), useRoleIds = useRoleIds), + baseVersion = null, + operations = emptyArray(), + ) + putVersionHash(masterBranch, initialVersion.getContentHash()) + return initialVersion } fun getBranchNames(repositoryId: RepositoryId): Set { @@ -211,40 +215,32 @@ class RepositoriesManager(val stores: StoreManager) : IRepositoriesManager { ensureBranchesAreInList(branch.repositoryId, setOf(branch.branchName)) } - override suspend fun removeRepository(repository: RepositoryId): Boolean { + override fun removeRepository(repository: RepositoryId): Boolean { val genericStore = stores.genericStore - return stores.runTransactionSuspendable { - if (!repositoryExists(repository)) { - return@runTransactionSuspendable false - } - - for (branchName in getBranchNames(repository)) { - putVersionHash(repository.getBranchReference(branchName), null) - } - genericStore.put(branchListKey(repository), null) - val isolated = checkNotNull(isIsolated(repository)) { "Repository not found: $repository" } - val existingRepositories = getRepositories(isolated) - val remainingRepositories = existingRepositories - repository - stores.getGlobalStoreClient().put(repositoriesListKey(isolated), remainingRepositories.joinToString("\n") { it.id }) - genericStore.removeRepositoryObjects(repository) - true + if (!repositoryExists(repository)) { + return false } - } - override suspend fun removeBranches(repository: RepositoryId, branchNames: Set) { - return stores.runTransactionSuspendable { - removeBranchesBlocking(repository, branchNames) + for (branchName in getBranchNames(repository)) { + putVersionHash(repository.getBranchReference(branchName), null) } + genericStore.put(branchListKey(repository), null) + val isolated = checkNotNull(isIsolated(repository)) { "Repository not found: $repository" } + val existingRepositories = getRepositories(isolated) + val remainingRepositories = existingRepositories - repository + stores.getGlobalStoreClient().put(repositoriesListKey(isolated), remainingRepositories.joinToString("\n") { it.id }) + genericStore.removeRepositoryObjects(repository) + return true } /** * Same as [removeBranches] but blocking. * Caller is expected to execute it outside the request thread. */ - override fun removeBranchesBlocking(repository: RepositoryId, branchNames: Set) { + override fun removeBranches(repository: RepositoryId, branchNames: Set) { if (branchNames.isEmpty()) return - stores.genericStore.runTransaction { + stores.genericStore.runWriteTransaction { val key = branchListKey(repository) val existingBranches = stores.genericStore[key]?.lines()?.toSet().orEmpty() val remainingBranches = existingBranches - branchNames @@ -255,69 +251,58 @@ class RepositoriesManager(val stores: StoreManager) : IRepositoriesManager { } } - override suspend fun mergeChanges(branch: BranchReference, newVersionHash: String): String { - return stores.runTransactionSuspendable { - mergeChangesBlocking(branch, newVersionHash) - } + override fun mergeChanges(branch: BranchReference, newVersionHash: String): String { + return mergeChangesBlocking(branch, newVersionHash) } /** * Same as [mergeChanges] but blocking. * Caller is expected to execute it outside the request thread. */ - override fun mergeChangesBlocking(branch: BranchReference, newVersionHash: String): String = - stores.genericStore.runTransaction { - val headHash = getVersionHashBlocking(branch) - val mergedHash = if (headHash == null) { - newVersionHash - } else { - val legacyObjectStore = getLegacyObjectStore(branch.repositoryId) - val headVersion = CLVersion(headHash, legacyObjectStore) - val newVersion = CLVersion(newVersionHash, legacyObjectStore) - require(headVersion.getTree().getId() == newVersion.getTree().getId()) { - "Attempt to merge a model with ID '${newVersion.getTree().getId()}'" + - " into one with ID '${headVersion.getTree().getId()}'" - } - val mergedVersion = VersionMerger(legacyObjectStore, stores.idGenerator) - .mergeChange(headVersion, newVersion) - mergedVersion.getContentHash() + override fun mergeChangesBlocking(branch: BranchReference, newVersionHash: String): String { + val headHash = getVersionHash(branch) + val mergedHash = if (headHash == null) { + newVersionHash + } else { + val legacyObjectStore = getLegacyObjectStore(branch.repositoryId) + val headVersion = CLVersion(headHash, legacyObjectStore) + val newVersion = CLVersion(newVersionHash, legacyObjectStore) + require(headVersion.getTree().getId() == newVersion.getTree().getId()) { + "Attempt to merge a model with ID '${newVersion.getTree().getId()}'" + + " into one with ID '${headVersion.getTree().getId()}'" } - ensureBranchInList(branch) - putVersionHash(branch, mergedHash) - mergedHash + val mergedVersion = VersionMerger(legacyObjectStore, stores.idGenerator) + .mergeChange(headVersion, newVersion) + mergedVersion.getContentHash() } + ensureBranchInList(branch) + putVersionHash(branch, mergedHash) + return mergedHash + } - override suspend fun getVersion(branch: BranchReference): CLVersion? { + override fun getVersion(branch: BranchReference): CLVersion? { return getVersionHash(branch)?.let { getVersion(branch.repositoryId, it) } } - override suspend fun getVersion(repository: RepositoryId, versionHash: String): CLVersion? { + override fun getVersion(repository: RepositoryId, versionHash: String): CLVersion? { val legacyObjectStore = getLegacyObjectStore(repository.takeIf { isIsolated(repository) == true }) return CLVersion.tryLoadFromHash(versionHash, legacyObjectStore) } - override suspend fun getVersionHash(branch: BranchReference): String? { - return stores.runTransactionSuspendable { - getVersionHashBlocking(branch) - } - } - /** * Same as [getVersionHash] but blocking. * Caller is expected to execute it outside the request thread. */ - private fun getVersionHashBlocking(branch: BranchReference): String? { - return stores.genericStore.runTransaction { - val isolated = isIsolated(branch.repositoryId) - if (isolated == null) { - // migrate existing but unknown legacy branch - val legacyHash = stores.genericStore[legacyBranchKey(branch)] ?: return@runTransaction null - ensureBranchInList(branch) - putVersionHash(branch, legacyHash) - return@runTransaction legacyHash - } else { - return@runTransaction stores.genericStore[branchKey(branch, isolated = isolated)] - } + override fun getVersionHash(branch: BranchReference): String? { + val isolated = isIsolated(branch.repositoryId) + if (isolated == null) { + // migrate existing but unknown legacy branch + val legacyHash = stores.genericStore[legacyBranchKey(branch)] ?: return null + ensureBranchInList(branch) + putVersionHash(branch, legacyHash) + return legacyHash + } else { + return stores.genericStore[branchKey(branch, isolated = isolated)] } } diff --git a/model-server/src/main/kotlin/org/modelix/model/server/store/IGenericStoreClient.kt b/model-server/src/main/kotlin/org/modelix/model/server/store/IGenericStoreClient.kt index 2df81b0136..0ea28f1f33 100644 --- a/model-server/src/main/kotlin/org/modelix/model/server/store/IGenericStoreClient.kt +++ b/model-server/src/main/kotlin/org/modelix/model/server/store/IGenericStoreClient.kt @@ -7,7 +7,9 @@ import org.modelix.model.lazy.RepositoryId * A store that saves data on a per-repository basis. * The primary key is of type [ObjectInRepository]. */ -interface IsolatingStore : IGenericStoreClient { +typealias IsolatingStore = IGenericStoreClient + +interface IRepositoryAwareStore : IsolatingStore { /** * Default implementation for removing repository objects. * May be overridden by more efficient, store-specific implementations. @@ -37,5 +39,8 @@ interface IGenericStoreClient : AutoCloseable { fun listen(key: KeyT, listener: IGenericKeyListener) fun removeListener(key: KeyT, listener: IGenericKeyListener) fun generateId(key: KeyT): Long - fun runTransaction(body: () -> T): T + fun runWriteTransaction(body: () -> T): T = getTransactionManager().runWrite(body) + fun runReadTransaction(body: () -> T): T = getTransactionManager().runRead(body) + fun getTransactionManager(): ITransactionManager + fun getImmutableStore(): IImmutableStore } diff --git a/model-server/src/main/kotlin/org/modelix/model/server/store/IImmutableStore.kt b/model-server/src/main/kotlin/org/modelix/model/server/store/IImmutableStore.kt new file mode 100644 index 0000000000..f069705a88 --- /dev/null +++ b/model-server/src/main/kotlin/org/modelix/model/server/store/IImmutableStore.kt @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2024. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.modelix.model.server.store + +import org.modelix.model.IGenericKeyListener + +/** + * Stores immutable objects where the key is the SHA hash of the value, which means the entries never change. + * Doesn't require transactions. + */ +interface IImmutableStore { + fun getAll(keys: Set): Map + fun addAll(entries: Map) + fun getIfCached(key: KeyT): String? +} + +fun IImmutableStore.asGenericStore() = ImmutableStoreAsGenericStore(this) + +class ImmutableStoreAsGenericStore(val store: IImmutableStore) : IGenericStoreClient { + override fun getAll(keys: Set): Map { + return store.getAll(keys) + } + + override fun getAll(): Map { + throw UnsupportedOperationException() + } + + override fun getIfCached(key: KeyT): String? { + return store.getIfCached(key) + } + + override fun putAll(entries: Map, silent: Boolean) { + store.addAll(entries.mapValues { requireNotNull(it.value) { "Deleting entries not allowed: $it" } }) + } + + override fun listen(key: KeyT, listener: IGenericKeyListener) { + throw UnsupportedOperationException() + } + + override fun removeListener(key: KeyT, listener: IGenericKeyListener) { + throw UnsupportedOperationException() + } + + override fun generateId(key: KeyT): Long { + throw UnsupportedOperationException() + } + + override fun getTransactionManager(): ITransactionManager { + throw UnsupportedOperationException() + } + + override fun getImmutableStore(): IImmutableStore { + return store + } + + override fun close() { + throw UnsupportedOperationException() + } +} diff --git a/model-server/src/main/kotlin/org/modelix/model/server/store/IStoreClient.kt b/model-server/src/main/kotlin/org/modelix/model/server/store/IStoreClient.kt index c74940b767..aafa4380e4 100644 --- a/model-server/src/main/kotlin/org/modelix/model/server/store/IStoreClient.kt +++ b/model-server/src/main/kotlin/org/modelix/model/server/store/IStoreClient.kt @@ -16,11 +16,11 @@ suspend fun StoreManager.runTransactionSuspendable(body: () -> T): T { } suspend fun IsolatingStore.runTransactionSuspendable(body: () -> T): T { - return withContext(Dispatchers.IO) { runTransaction(body) } + return withContext(Dispatchers.IO) { runWriteTransaction(body) } } suspend fun IStoreClient.runTransactionSuspendable(body: () -> T): T { - return withContext(Dispatchers.IO) { runTransaction(body) } + return withContext(Dispatchers.IO) { runWriteTransaction(body) } } suspend fun pollEntry(storeClient: IsolatingStore, key: ObjectInRepository, lastKnownValue: String?): String? { @@ -53,7 +53,7 @@ suspend fun pollEntry(storeClient: IsolatingStore, key: ObjectInRepository, last // known value. // Registering the listener without needing it is less // likely to happen. - val value = storeClient[key] + val value = storeClient.runReadTransaction { storeClient[key] } if (value != lastKnownValue) { callHandler(value) return@coroutineScope @@ -65,7 +65,7 @@ suspend fun pollEntry(storeClient: IsolatingStore, key: ObjectInRepository, last } finally { storeClient.removeListener(key, listener) } - if (!handlerCalled) result = storeClient[key] + if (!handlerCalled) result = storeClient.runReadTransaction { storeClient[key] } } return result } diff --git a/model-server/src/main/kotlin/org/modelix/model/server/store/IgniteStoreClient.kt b/model-server/src/main/kotlin/org/modelix/model/server/store/IgniteStoreClient.kt index 093d1e04b3..333b767d9d 100644 --- a/model-server/src/main/kotlin/org/modelix/model/server/store/IgniteStoreClient.kt +++ b/model-server/src/main/kotlin/org/modelix/model/server/store/IgniteStoreClient.kt @@ -6,13 +6,17 @@ import org.apache.ignite.IgniteCache import org.apache.ignite.Ignition import org.apache.ignite.cache.CachePeekMode import org.apache.ignite.cache.query.ScanQuery +import org.apache.ignite.internal.IgnitionEx import org.apache.ignite.lang.IgniteBiPredicate import org.apache.ignite.lang.IgniteClosure +import org.apache.ignite.transactions.TransactionConcurrency +import org.apache.ignite.transactions.TransactionIsolation import org.modelix.kotlin.utils.ContextValue import org.modelix.model.IGenericKeyListener import org.modelix.model.lazy.RepositoryId import org.modelix.model.persistent.HashUtil import org.modelix.model.server.SqlUtils +import org.modelix.model.server.handlers.ObjectValueNotFoundException import java.sql.SQLException import java.util.* import javax.cache.Cache @@ -26,6 +30,8 @@ private val LOG = KotlinLogging.logger { } */ class IgniteStoreClient(jdbcProperties: Properties? = null, private val inmemory: Boolean = false) : IsolatingStore, + ITransactionManager, + IRepositoryAwareStore, AutoCloseable { companion object { @@ -47,6 +53,11 @@ class IgniteStoreClient(jdbcProperties: Properties? = null, private val inmemory ) } + /** + * This works only with a single instance of the model server. + */ + private val locks = TransactionLocks() + /** * Instantiate an IgniteStoreClient * @@ -65,7 +76,10 @@ class IgniteStoreClient(jdbcProperties: Properties? = null, private val inmemory } } if (!inmemory) updateDatabaseSchema() - ignite = Ignition.start(javaClass.getResource(igniteConfigName)) + + // When running tests, Ignite complains about an already running instance, if we don't provide a unique name. + val instanceName = if (inmemory) UUID.randomUUID().toString() else null + ignite = IgnitionEx.start(javaClass.getResource(igniteConfigName), instanceName, null, null) cache = ignite.getOrCreateCache("model") ignite.message().localListen(ENTRY_CHANGED_TOPIC) { _: UUID?, key: Any? -> @@ -81,18 +95,22 @@ class IgniteStoreClient(jdbcProperties: Properties? = null, private val inmemory } override fun getIfCached(key: ObjectInRepository): String? { + locks.assertRead() return cache.localPeek(key, CachePeekMode.ONHEAP, CachePeekMode.OFFHEAP) } override fun getAll(keys: Set): Map { + locks.assertRead() return cache.getAll(keys) } override fun getAll(): Map { + locks.assertRead() return cache.associate { it.key to it.value } } override fun removeRepositoryObjects(repositoryId: RepositoryId) { + locks.assertWrite() if (!inmemory) { // Not all entries are in the cache. We delete them directly instead of loading them into the cache first. // This should be safe as the repository has already been removed from the list of available ones. @@ -131,19 +149,19 @@ class IgniteStoreClient(jdbcProperties: Properties? = null, private val inmemory } override fun putAll(entries: Map, silent: Boolean) { + locks.assertWrite() + // Sorting is important to avoid deadlocks (lock ordering). // The documentation of IgniteCache.putAll also states that this a requirement. val sortedEntries = entries.toSortedMap() val deletes = sortedEntries.asSequence().filter { it.value == null }.map { it.key }.toSet() val puts = sortedEntries.filterValues { it != null } - runTransaction { - if (deletes.isNotEmpty()) cache.removeAll(deletes) - if (puts.isNotEmpty()) cache.putAll(puts) - if (!silent) { - for (key in sortedEntries.keys) { - if (HashUtil.isSha256(key.key)) continue - pendingChangeMessages.entryChanged(key) - } + if (deletes.isNotEmpty()) cache.removeAll(deletes) + if (puts.isNotEmpty()) cache.putAll(puts) + if (!silent) { + for (key in sortedEntries.keys) { + if (HashUtil.isSha256(key.key)) continue + pendingChangeMessages.entryChanged(key) } } } @@ -163,19 +181,85 @@ class IgniteStoreClient(jdbcProperties: Properties? = null, private val inmemory return cache.invoke(key, ClientIdProcessor()) } - override fun runTransaction(body: () -> T): T { - val transactions = ignite.transactions() - if (transactions.tx() == null) { - transactions.txStart().use { tx -> - return pendingChangeMessages.runAndFlush { - val result = body() - tx.commit() - result + override fun runWrite(body: () -> T): T { + return locks.runWrite { + val transactions = ignite.transactions() + if (transactions.tx() == null) { + // Ignites fine-grained lock management per entry isn't suitable for our use case. + // We use a global ReentrantReadWriteLock instead, which ensures that, if there is a write transaction, + // no other read or write transactions can be active. + // This allows us to configure the lowest transaction level in Ignite with the highest performance. + // TODO Before we can support multiple model-server instances, we have to implement a cluster-wide + // global locking mechanism. + // See also https://issues.modelix.org/issue/MODELIX-344/Support-multiple-model-server-instances + transactions.txStart(TransactionConcurrency.OPTIMISTIC, TransactionIsolation.READ_COMMITTED).use { tx -> + pendingChangeMessages.runAndFlush { + val result = body() + tx.commit() + result + } + } + } else { + // already in a transaction + body() + } + } + } + + override fun runRead(body: () -> T): T { + return locks.runRead { + val transactions = ignite.transactions() + if (transactions.tx() == null) { + transactions.txStart(TransactionConcurrency.OPTIMISTIC, TransactionIsolation.READ_COMMITTED).use { tx -> + body() + } + } else { + // already in a transaction + body() + } + } + } + + override fun canRead(): Boolean { + return locks.canRead() + } + + override fun canWrite(): Boolean { + return locks.canWrite() + } + + override fun getTransactionManager(): ITransactionManager { + return object : ITransactionManager { + override fun canRead(): Boolean = locks.canRead() + override fun canWrite(): Boolean = locks.canWrite() + override fun runRead(body: () -> R): R = this@IgniteStoreClient.runRead(body) + override fun runWrite(body: () -> R): R = this@IgniteStoreClient.runWrite(body) + } + } + + override fun getImmutableStore(): IImmutableStore { + return object : IImmutableStore { + override fun getAll(keys: Set): Map { + keys.forEach { require(HashUtil.isSha256(it.key)) { "Not an immutable object: $it" } } + return cache.getAll(keys).mapValues { + val value = it.value + if (value == null) throw ObjectValueNotFoundException(it.key.key) + value } } - } else { - // already in a transaction - return body() + + override fun addAll(entries: Map) { + entries.forEach { + require(HashUtil.isSha256(it.key.key)) { "Not an immutable object: $it" } + HashUtil.checkObjectHash(it.key.key, it.value) + } + cache.putAll(entries) + } + + override fun getIfCached(key: ObjectInRepository): String? { + require(HashUtil.isSha256(key.key)) { "Not an immutable object: $key" } + return cache.localPeek(key, CachePeekMode.ONHEAP, CachePeekMode.OFFHEAP) + } } } diff --git a/model-server/src/main/kotlin/org/modelix/model/server/store/InMemoryStoreClient.kt b/model-server/src/main/kotlin/org/modelix/model/server/store/InMemoryStoreClient.kt index 53a8a3e125..b46c7344c7 100644 --- a/model-server/src/main/kotlin/org/modelix/model/server/store/InMemoryStoreClient.kt +++ b/model-server/src/main/kotlin/org/modelix/model/server/store/InMemoryStoreClient.kt @@ -1,6 +1,8 @@ package org.modelix.model.server.store import org.modelix.model.IGenericKeyListener +import org.modelix.model.persistent.HashUtil +import org.modelix.model.server.handlers.ObjectValueNotFoundException import org.slf4j.LoggerFactory fun generateId(idStr: String?): Long { @@ -16,7 +18,7 @@ fun generateId(idStr: String?): Long { } + 1L } -class InMemoryStoreClient : IsolatingStore { +class InMemoryStoreClient : IsolatingStore, ITransactionManager, IRepositoryAwareStore { companion object { private val LOG = LoggerFactory.getLogger(InMemoryStoreClient::class.java) } @@ -25,89 +27,135 @@ class InMemoryStoreClient : IsolatingStore { private var transactionValues: MutableMap? = null private val changeNotifier = ChangeNotifier(this) private val pendingChangeMessages = PendingChangeMessages(changeNotifier::notifyListeners) + private val locks = TransactionLocks() - @Synchronized override fun get(key: ObjectInRepository): String? { + locks.assertRead() return if (transactionValues?.contains(key) == true) transactionValues!![key] else values[key] } override fun getIfCached(key: ObjectInRepository): String? { + locks.assertRead() return get(key) } - @Synchronized override fun getAll(keys: List): List { + locks.assertRead() return keys.map { get(it) } } - @Synchronized override fun getAll(): Map { + locks.assertRead() return values + (transactionValues ?: emptyMap()) } - @Synchronized override fun getAll(keys: Set): Map { + locks.assertRead() return keys.associateWith { get(it) } } - @Synchronized override fun put(key: ObjectInRepository, value: String?, silent: Boolean) { - runTransaction { - (transactionValues ?: values)[key] = value - if (!silent) { - pendingChangeMessages.entryChanged(key) - } + locks.assertWrite() + (transactionValues ?: values)[key] = value + if (!silent) { + pendingChangeMessages.entryChanged(key) } } - @Synchronized override fun putAll(entries: Map, silent: Boolean) { - runTransaction { - for ((key, value) in entries) { - put(key, value, silent) - } + locks.assertWrite() + for ((key, value) in entries) { + put(key, value, silent) } } - @Synchronized override fun listen(key: ObjectInRepository, listener: IGenericKeyListener) { changeNotifier.addListener(key, listener) } - @Synchronized override fun removeListener(key: ObjectInRepository, listener: IGenericKeyListener) { changeNotifier.removeListener(key, listener) } - @Synchronized override fun generateId(key: ObjectInRepository): Long { - val id = generateId(get(key)) - put(key, id.toString(), false) - return id - } - - @Synchronized - override fun runTransaction(body: () -> T): T { - if (transactionValues == null) { - return pendingChangeMessages.runAndFlush { - try { - transactionValues = HashMap() - val result = body() - val tValues = requireNotNull(transactionValues) { "Passed lambda set 'transactionValues' to null unexpectedly." } - - val puts = tValues.filterValues { it != null } - val deletes = tValues.filterValues { it == null }.keys - values.putAll(puts) - for (keyToDelete in deletes) { - values.remove(keyToDelete) + // This is an atomic operation that doesn't require the caller to start a transaction + return runWriteTransaction { + val id = generateId(get(key)) + put(key, id.toString(), false) + id + } + } + + override fun runWrite(body: () -> T): T { + return locks.runWrite { + if (transactionValues == null) { + pendingChangeMessages.runAndFlush { + try { + transactionValues = HashMap() + val result = body() + val tValues = requireNotNull(transactionValues) { "Passed lambda set 'transactionValues' to null unexpectedly." } + + val puts = tValues.filterValues { it != null } + val deletes = tValues.filterValues { it == null }.keys + values.putAll(puts) + for (keyToDelete in deletes) { + values.remove(keyToDelete) + } + result + } finally { + transactionValues = null } - result - } finally { - transactionValues = null } + } else { + body() + } + } + } + + override fun runRead(body: () -> T): T { + return locks.runRead(body) + } + + override fun canRead(): Boolean { + return locks.canRead() + } + + override fun canWrite(): Boolean { + return locks.canWrite() + } + + override fun getTransactionManager(): ITransactionManager { + return object : ITransactionManager { + override fun canRead(): Boolean = locks.canRead() + override fun canWrite(): Boolean = locks.canWrite() + override fun runRead(body: () -> R): R = this@InMemoryStoreClient.runRead(body) + override fun runWrite(body: () -> R): R = this@InMemoryStoreClient.runWrite(body) + } + } + + override fun getImmutableStore(): IImmutableStore { + return object : IImmutableStore { + override fun getAll(keys: Set): Map { + keys.forEach { require(HashUtil.isSha256(it.key)) { "Not an immutable object: $it" } } + return runRead { this@InMemoryStoreClient.getAll(keys) }.mapValues { + val value = it.value + if (value == null) throw ObjectValueNotFoundException(it.key.key) + value + } + } + + override fun addAll(entries: Map) { + entries.forEach { + require(HashUtil.isSha256(it.key.key)) { "Not an immutable object: $it" } + HashUtil.checkObjectHash(it.key.key, it.value) + } + runWrite { this@InMemoryStoreClient.putAll(entries) } + } + + override fun getIfCached(key: ObjectInRepository): String? { + require(HashUtil.isSha256(key.key)) { "Not an immutable object: $key" } + return runRead { this@InMemoryStoreClient.getIfCached(key) } } - } else { - return body() } } diff --git a/model-server/src/main/kotlin/org/modelix/model/server/store/StoreClientAdapter.kt b/model-server/src/main/kotlin/org/modelix/model/server/store/StoreClientAdapter.kt index d1d7e000ce..0dba81c86c 100644 --- a/model-server/src/main/kotlin/org/modelix/model/server/store/StoreClientAdapter.kt +++ b/model-server/src/main/kotlin/org/modelix/model/server/store/StoreClientAdapter.kt @@ -76,8 +76,42 @@ abstract class StoreClientAdapter(val client: IsolatingStore) : IStoreClient { return client.generateId(key.withRepoScope()) } - override fun runTransaction(body: () -> T): T { - return client.runTransaction(body) + override fun getTransactionManager(): ITransactionManager { + return client.getTransactionManager() + } + + override fun getImmutableStore(): IImmutableStore { + val immutableStore = client.getImmutableStore() + return object : IImmutableStore { + override fun getAll(keys: Set): Map { + val fromRepository = immutableStore.getAll(keys.map { it.withRepoScope() }.toSet()).mapKeys { it.key.key } + if (getRepositoryId() == null) return fromRepository + + // Existing databases may have objects stored without information about the repository. + // Try to load these legacy entries. + val missingKeys = fromRepository.entries.asSequence().filter { it.value == null }.map { + ObjectInRepository.global( + it.key, + ) + }.toSet() + if (missingKeys.isEmpty()) return fromRepository + val fromGlobal = immutableStore.getAll(missingKeys).mapKeys { it.key.key } + + return fromRepository + fromGlobal + } + + override fun addAll(entries: Map) { + immutableStore.addAll(entries.mapKeys { it.key.withRepoScope() }) + } + + override fun getIfCached(key: String): String? { + val fromRepository = immutableStore.getIfCached(key.withRepoScope()) + if (fromRepository != null) return fromRepository + // Existing databases may have objects stored without information about the repository. + // Try to load these legacy entries. + return immutableStore.getIfCached(ObjectInRepository.global(key)) + } + } } override fun close() { diff --git a/model-server/src/main/kotlin/org/modelix/model/server/store/StoreManager.kt b/model-server/src/main/kotlin/org/modelix/model/server/store/StoreManager.kt index 218f71adbd..18bb1284de 100644 --- a/model-server/src/main/kotlin/org/modelix/model/server/store/StoreManager.kt +++ b/model-server/src/main/kotlin/org/modelix/model/server/store/StoreManager.kt @@ -12,19 +12,23 @@ import org.modelix.model.lazy.IDeserializingKeyValueStore import org.modelix.model.lazy.RepositoryId import java.lang.ref.SoftReference -class StoreManager(val genericStore: IsolatingStore) { +class StoreManager(val genericStore: IRepositoryAwareStore) { private val repositorySpecificStores = HashMap>() val clientId: Int by lazy { getGlobalStoreClient().generateId("clientId").toInt() } val idGenerator: IIdGenerator by lazy { IdGenerator.getInstance(clientId) } - fun getGlobalStoreClient() = getStoreClient(null) + fun getTransactionManager(): ITransactionManager = genericStore.getTransactionManager() - fun getStoreClient(repository: RepositoryId?): IStoreClient { - return if (repository == null) { - genericStore.forGlobalRepository() - } else { - genericStore.forRepository(repository) + fun getGlobalStoreClient(immutable: Boolean = false) = getStoreClient(null, immutable) + + fun getStoreClient(repository: RepositoryId?, immutable: Boolean): IStoreClient { + return (if (immutable) genericStore.getImmutableStore().asGenericStore() else genericStore).let { + if (repository == null) { + it.forGlobalRepository() + } else { + it.forRepository(repository) + } } } @@ -35,7 +39,7 @@ class StoreManager(val genericStore: IsolatingStore) { val newStore = BulkAsyncStore( CachingAsyncStore( - StoreClientAsAsyncStore(getStoreClient(repository)), + StoreClientAsAsyncStore(getStoreClient(repository, true)), cacheSize = System.getenv("MODELIX_OBJECT_CACHE_SIZE")?.toIntOrNull() ?: 500_000, ), ) @@ -48,7 +52,7 @@ class StoreManager(val genericStore: IsolatingStore) { fun getGlobalKeyValueStore() = getKeyValueStore(null) fun getKeyValueStore(repository: RepositoryId?): IKeyValueStore { - return StoreClientAsKeyValueStore(getStoreClient(repository)) + return StoreClientAsKeyValueStore(getStoreClient(repository, true)) } fun asModelClient(repository: RepositoryId?): IModelClient { diff --git a/model-server/src/main/kotlin/org/modelix/model/server/store/TransactionLocks.kt b/model-server/src/main/kotlin/org/modelix/model/server/store/TransactionLocks.kt new file mode 100644 index 0000000000..4071124746 --- /dev/null +++ b/model-server/src/main/kotlin/org/modelix/model/server/store/TransactionLocks.kt @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2024. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.modelix.model.server.store + +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.withContext +import java.util.concurrent.locks.ReentrantReadWriteLock +import kotlin.concurrent.withLock + +interface ITransactionManager { + fun canRead(): Boolean + fun canWrite(): Boolean + fun runRead(body: () -> R): R + fun runWrite(body: () -> R): R +} + +suspend fun ITransactionManager.runWriteIO(body: () -> R): R { + return withContext(Dispatchers.IO) { + runWrite(body) + } +} + +suspend fun ITransactionManager.runReadIO(body: () -> R): R { + return withContext(Dispatchers.IO) { + runRead(body) + } +} + +fun ITransactionManager.assertRead() { + if (!canRead()) throw MissingReadTransactionException() +} + +fun ITransactionManager.assertWrite() { + if (!canWrite()) throw MissingWriteTransactionException() +} + +abstract class MissingTransactionException(message: String) : RuntimeException() +class MissingReadTransactionException() : RuntimeException("Read transaction required") +class MissingWriteTransactionException() : RuntimeException("Write transaction required") + +/** + * This is mostly equivalent to how MPS manages locks on the model. + */ +class TransactionLocks : ITransactionManager { + private val readWriteLock = ReentrantReadWriteLock() + private val readLock = readWriteLock.readLock() + private val writeLock = readWriteLock.writeLock() + + override fun canRead() = readWriteLock.readHoldCount != 0 || readWriteLock.isWriteLockedByCurrentThread + + override fun canWrite() = readWriteLock.isWriteLockedByCurrentThread && readWriteLock.readHoldCount == 0 + + override fun runRead(body: () -> R): R { + return readLock.withLock(body) + } + + override fun runWrite(body: () -> R): R { + check(readWriteLock.readHoldCount == 0) { + "deadlock prevention: do not start write transaction from read" + } + return writeLock.withLock(body) + } +} diff --git a/model-server/src/test/java/org/modelix/model/server/RestModelServerTest.kt b/model-server/src/test/java/org/modelix/model/server/RestModelServerTest.kt index 41c3cdf663..a4556b84cd 100644 --- a/model-server/src/test/java/org/modelix/model/server/RestModelServerTest.kt +++ b/model-server/src/test/java/org/modelix/model/server/RestModelServerTest.kt @@ -6,7 +6,6 @@ import org.modelix.model.server.handlers.KeyValueLikeModelServer import org.modelix.model.server.handlers.RepositoriesManager import org.modelix.model.server.store.InMemoryStoreClient import org.modelix.model.server.store.forGlobalRepository -import org.modelix.model.server.store.getGenericStore class RestModelServerTest { @Test @@ -20,9 +19,10 @@ class RestModelServerTest { @Test fun testCollectExistingKeyNotHash() { - val storeClient = InMemoryStoreClient().forGlobalRepository() + val genericStore = InMemoryStoreClient() + val storeClient = genericStore.forGlobalRepository() storeClient.put("existingKey", "foo", false) - val rms = KeyValueLikeModelServer(RepositoriesManager(storeClient.getGenericStore())) + val rms = KeyValueLikeModelServer(RepositoriesManager(genericStore)) val result = rms.collect("existingKey", null) Assert.assertEquals(1, result.length().toLong()) Assert.assertEquals( @@ -35,12 +35,13 @@ class RestModelServerTest { @Test fun testCollectExistingKeyHash() { - val storeClient = InMemoryStoreClient().forGlobalRepository() + val genericStore = InMemoryStoreClient() + val storeClient = genericStore.forGlobalRepository() storeClient.put("existingKey", "hash-*0123456789-0123456789-0123456789-00001", false) storeClient.put("hash-*0123456789-0123456789-0123456789-00001", "bar", false) val rms = KeyValueLikeModelServer( - RepositoriesManager(storeClient.getGenericStore()), + RepositoriesManager(genericStore), ) val result = rms.collect("existingKey", null) Assert.assertEquals(2, result.length().toLong()) @@ -58,7 +59,8 @@ class RestModelServerTest { @Test fun testCollectExistingKeyHashChained() { - val storeClient = InMemoryStoreClient().forGlobalRepository() + val genericStore = InMemoryStoreClient() + val storeClient = genericStore.forGlobalRepository() storeClient.put("root", "hash-*0123456789-0123456789-0123456789-00001", false) storeClient.put( "hash-*0123456789-0123456789-0123456789-00001", @@ -78,7 +80,7 @@ class RestModelServerTest { storeClient.put("hash-*0123456789-0123456789-0123456789-00004", "end", false) val rms = KeyValueLikeModelServer( - RepositoriesManager(storeClient.getGenericStore()), + RepositoriesManager(genericStore), ) val result = rms.collect("root", null) Assert.assertEquals(5, result.length().toLong()) @@ -111,7 +113,8 @@ class RestModelServerTest { @Test fun testCollectExistingKeyHashChainedWithRepetitions() { - val storeClient = InMemoryStoreClient().forGlobalRepository() + val genericStore = InMemoryStoreClient() + val storeClient = genericStore.forGlobalRepository() storeClient.put("root", "hash-*0123456789-0123456789-0123456789-00001", false) storeClient.put( "hash-*0123456789-0123456789-0123456789-00001", @@ -130,7 +133,7 @@ class RestModelServerTest { ) val rms = KeyValueLikeModelServer( - RepositoriesManager(storeClient.getGenericStore()), + RepositoriesManager(genericStore), ) val result = rms.collect("root", null) Assert.assertEquals(4, result.length().toLong()) diff --git a/model-server/src/test/kotlin/org/modelix/model/server/IgniteStoreClientParallelTransactionsTest.kt b/model-server/src/test/kotlin/org/modelix/model/server/IgniteStoreClientParallelTransactionsTest.kt index 68c396375f..bf99ae8dc3 100644 --- a/model-server/src/test/kotlin/org/modelix/model/server/IgniteStoreClientParallelTransactionsTest.kt +++ b/model-server/src/test/kotlin/org/modelix/model/server/IgniteStoreClientParallelTransactionsTest.kt @@ -28,6 +28,7 @@ abstract class StoreClientParallelTransactionsTest(val store: IStoreClient) { store.close() } + @Ignore("write transactions cannot run in parallel anymore") @Test fun `notifications are not lost because of parallel transactions`() = runTest { val threadBlocker = ThreadBlocker() @@ -43,7 +44,7 @@ abstract class StoreClientParallelTransactionsTest(val store: IStoreClient) { ) launch(Dispatchers.IO) { - store.runTransaction { + store.runWriteTransaction { store.put("key1", "valueA") threadBlocker.reachPointInTime(1) threadBlocker.sleepUntilPointInTime(2) @@ -56,7 +57,7 @@ abstract class StoreClientParallelTransactionsTest(val store: IStoreClient) { } launch(Dispatchers.IO) { - store.runTransaction { + store.runWriteTransaction { threadBlocker.sleepUntilPointInTime(1) store.put("key2", "valueB") threadBlocker.reachPointInTime(2) @@ -68,6 +69,55 @@ abstract class StoreClientParallelTransactionsTest(val store: IStoreClient) { val notifiedValue = notifiedValueFuture.get(10, TimeUnit.SECONDS) assertEquals("valueB", notifiedValue) } + + @Test + fun `non-ordered writes don't result in a deadlock`() = runTest { + // fine-grained locks always have the potential to cause a deadlock when they are not acquired in the + // same order (lock ordering). + // Ignite manages locks per entry. We already sort the entries in putAll, but that's not sufficient because + // There can still be multiple put calls. + // This test reproduces this issue which is now fixed by preventing parallel write transactions. + + val threadBlocker = ThreadBlocker() + + launch(Dispatchers.IO) { + println("(1) launched") + threadBlocker.reachPointInTime(1) + threadBlocker.sleepUntilPointInTime(2) + store.runWriteTransaction { + println("(1) inside transaction") + + println("(1) writing a = c1") + store.put("a", "c1") + println("(1) done a = c1") + + Thread.sleep(100) + + println("(1) writing b = c1") + store.put("b", "c1") + println("(1) done b = c1") + } + } + + launch(Dispatchers.IO) { + println("(2) launched") + threadBlocker.sleepUntilPointInTime(1) + threadBlocker.reachPointInTime(2) + store.runWriteTransaction { + println("(2) inside transaction") + + println("(2) writing b = c2") + store.put("b", "c2") + println("(1) done b = c2") + + Thread.sleep(100) + + println("(2) writing a = c2") + store.put("a", "c2") + println("(2) done a = c2") + } + } + } } class ThreadBlocker { diff --git a/model-server/src/test/kotlin/org/modelix/model/server/ModelClientTest.kt b/model-server/src/test/kotlin/org/modelix/model/server/ModelClientTest.kt index 16a26f6737..82fd9d308f 100644 --- a/model-server/src/test/kotlin/org/modelix/model/server/ModelClientTest.kt +++ b/model-server/src/test/kotlin/org/modelix/model/server/ModelClientTest.kt @@ -7,6 +7,7 @@ import io.ktor.server.testing.ApplicationTestBuilder import io.ktor.server.testing.testApplication import kotlinx.coroutines.delay import kotlinx.coroutines.withTimeout +import mu.KotlinLogging import org.modelix.authorization.installAuthentication import org.modelix.model.IKeyListener import org.modelix.model.client.RestWebModelClient @@ -20,14 +21,20 @@ import kotlin.test.assertNotNull import kotlin.test.fail import kotlin.time.Duration.Companion.seconds +private val LOG = KotlinLogging.logger { } + class ModelClientTest { private fun runTest(block: suspend ApplicationTestBuilder.() -> Unit) = testApplication { application { - installAuthentication(unitTestMode = true) - install(Resources) - install(IgnoreTrailingSlash) - KeyValueLikeModelServer(RepositoriesManager(InMemoryStoreClient())).init(this) + try { + installAuthentication(unitTestMode = true) + install(Resources) + install(IgnoreTrailingSlash) + KeyValueLikeModelServer(RepositoriesManager(InMemoryStoreClient())).init(this) + } catch (ex: Throwable) { + LOG.error("", ex) + } } block() } diff --git a/model-server/src/test/kotlin/org/modelix/model/server/ModelClientV2Test.kt b/model-server/src/test/kotlin/org/modelix/model/server/ModelClientV2Test.kt index ca7f777cde..598f9559de 100644 --- a/model-server/src/test/kotlin/org/modelix/model/server/ModelClientV2Test.kt +++ b/model-server/src/test/kotlin/org/modelix/model/server/ModelClientV2Test.kt @@ -2,6 +2,7 @@ package org.modelix.model.server import io.ktor.server.testing.ApplicationTestBuilder import io.ktor.server.testing.testApplication +import mu.KotlinLogging import org.modelix.model.api.IConceptReference import org.modelix.model.api.INode import org.modelix.model.api.ITree @@ -36,16 +37,22 @@ import kotlin.test.assertFailsWith import kotlin.test.assertFalse import kotlin.test.assertTrue +private val LOG = KotlinLogging.logger { } + class ModelClientV2Test { private lateinit var statistics: StoreClientWithStatistics private fun runTest(block: suspend ApplicationTestBuilder.() -> Unit) = testApplication { application { - installDefaultServerPlugins() - statistics = StoreClientWithStatistics(InMemoryStoreClient()) - val repoManager = RepositoriesManager(statistics) - ModelReplicationServer(repoManager).init(this) - IdsApiImpl(repoManager).init(this) + try { + installDefaultServerPlugins() + statistics = StoreClientWithStatistics(InMemoryStoreClient()) + val repoManager = RepositoriesManager(statistics) + ModelReplicationServer(repoManager).init(this) + IdsApiImpl(repoManager).init(this) + } catch (ex: Throwable) { + LOG.error("", ex) + } } block() } diff --git a/model-server/src/test/kotlin/org/modelix/model/server/StoreClientTest.kt b/model-server/src/test/kotlin/org/modelix/model/server/StoreClientTest.kt index a345c08466..9e8e83f489 100644 --- a/model-server/src/test/kotlin/org/modelix/model/server/StoreClientTest.kt +++ b/model-server/src/test/kotlin/org/modelix/model/server/StoreClientTest.kt @@ -7,6 +7,7 @@ import org.modelix.model.IKeyListener import org.modelix.model.server.store.IStoreClient import org.modelix.model.server.store.IgniteStoreClient import org.modelix.model.server.store.InMemoryStoreClient +import org.modelix.model.server.store.MissingWriteTransactionException import org.modelix.model.server.store.forGlobalRepository import java.util.Collections import kotlin.random.Random @@ -14,7 +15,6 @@ import kotlin.test.AfterTest import kotlin.test.Test import kotlin.test.assertEquals import kotlin.test.assertFailsWith -import kotlin.test.assertNull class MapBasedStoreClientTest : StoreClientTest(InMemoryStoreClient().forGlobalRepository()) class IgniteStoreClientTest : StoreClientTest(IgniteStoreClient(inmemory = true).forGlobalRepository()) @@ -28,20 +28,20 @@ abstract class StoreClientTest(val store: IStoreClient) { @Test fun `transaction can be started from inside a transaction`() { - store.runTransaction { - store.runTransaction { + store.runWriteTransaction { + store.runWriteTransaction { store.put("abc", "def") } } } @Test - fun `allow put without transaction`() { + fun `put without transaction fails`() { val key = "ljnrdlfkesmgf" val value = "izujztdrsew" - assertNull(store.get(key)) - store.put(key, value) - assertEquals(value, store.get(key)) + assertFailsWith { + store.put(key, value) + } } @Test @@ -50,7 +50,7 @@ abstract class StoreClientTest(val store: IStoreClient) { repeat(2) { val rand = Random(it) launch { - store.runTransaction { + store.runWriteTransaction { repeat(10) { val value = rand.nextInt().toString() store.put(key, value) @@ -69,16 +69,16 @@ abstract class StoreClientTest(val store: IStoreClient) { val value1 = "a" val value2 = "b" - store.put(key, value1) - assertEquals(value1, store.get(key)) + store.runWriteTransaction { store.put(key, value1) } + assertEquals(value1, store.runReadTransaction { store.get(key) }) assertFailsWith(NullPointerException::class) { - store.runTransaction { + store.runWriteTransaction { store.put(key, value2) assertEquals(value2, store.get(key)) throw NullPointerException() } } - assertEquals(value1, store.get(key)) // failed transaction should be rolled back + assertEquals(value1, store.runReadTransaction { store.get(key) }) // failed transaction should be rolled back } @Test @@ -99,8 +99,8 @@ abstract class StoreClientTest(val store: IStoreClient) { }, ) - store.put(key, value1) - assertEquals(value1, store.get(key)) + store.runWriteTransaction { store.put(key, value1) } + assertEquals(value1, store.runReadTransaction { store.get(key) }) assertEquals(setOf(value1), valuesSeenByListener) valuesSeenByListener.clear() @@ -108,7 +108,7 @@ abstract class StoreClientTest(val store: IStoreClient) { coroutineScope { launch { assertFailsWith(NullPointerException::class) { - store.runTransaction { + store.runWriteTransaction { assertEquals(value1, store.get(key)) store.put(key, value2, silent = false) assertEquals(value2, store.get(key)) @@ -118,7 +118,7 @@ abstract class StoreClientTest(val store: IStoreClient) { } launch { - store.runTransaction { + store.runWriteTransaction { assertEquals(value1, store.get(key)) store.put(key, value3, silent = false) assertEquals(value3, store.get(key)) diff --git a/model-server/src/test/kotlin/org/modelix/model/server/StoreClientWithStatistics.kt b/model-server/src/test/kotlin/org/modelix/model/server/StoreClientWithStatistics.kt index c8a0b1cdf3..f22ddba708 100644 --- a/model-server/src/test/kotlin/org/modelix/model/server/StoreClientWithStatistics.kt +++ b/model-server/src/test/kotlin/org/modelix/model/server/StoreClientWithStatistics.kt @@ -1,10 +1,10 @@ package org.modelix.model.server -import org.modelix.model.server.store.IsolatingStore +import org.modelix.model.server.store.IRepositoryAwareStore import org.modelix.model.server.store.ObjectInRepository import java.util.concurrent.atomic.AtomicLong -class StoreClientWithStatistics(val store: IsolatingStore) : IsolatingStore by store { +class StoreClientWithStatistics(val store: IRepositoryAwareStore) : IRepositoryAwareStore by store { private val totalRequests = AtomicLong() fun getTotalRequests() = totalRequests.get() diff --git a/model-server/src/test/kotlin/org/modelix/model/server/TransactionLocksTest.kt b/model-server/src/test/kotlin/org/modelix/model/server/TransactionLocksTest.kt new file mode 100644 index 0000000000..f233521a36 --- /dev/null +++ b/model-server/src/test/kotlin/org/modelix/model/server/TransactionLocksTest.kt @@ -0,0 +1,212 @@ +/* + * Copyright (c) 2024. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.modelix.model.server + +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch +import kotlinx.coroutines.test.runTest +import org.modelix.model.server.store.TransactionLocks +import java.util.concurrent.atomic.AtomicInteger +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFailsWith +import kotlin.test.assertFalse +import kotlin.test.assertTrue + +class TransactionLocksTest { + + @Test + fun `read inside write disables writes`() { + val locks = TransactionLocks() + locks.runWrite { + assertTrue(locks.canWrite()) + locks.runRead { + assertFalse(locks.canWrite()) + } + } + } + + @Test + fun `write inside read isn't allowed`() { + val locks = TransactionLocks() + locks.runRead { + assertFailsWith { + locks.runWrite {} + } + } + } + + @Test + fun `cannot read outside transaction`() { + val locks = TransactionLocks() + assertFalse(locks.canRead()) + } + + @Test + fun `cannot write outside transaction`() { + val locks = TransactionLocks() + assertFalse(locks.canWrite()) + } + + @Test + fun `read transaction cannot write`() { + val locks = TransactionLocks() + locks.runRead { + assertFalse(locks.canWrite()) + } + } + + @Test + fun `read transaction can read`() { + val locks = TransactionLocks() + locks.runRead { + assertTrue(locks.canRead()) + } + } + + @Test + fun `write transaction can read`() { + val locks = TransactionLocks() + locks.runWrite { + assertTrue(locks.canRead()) + } + } + + @Test + fun `write transaction can write`() { + val locks = TransactionLocks() + locks.runWrite { + assertTrue(locks.canWrite()) + } + } + + @Test + fun `write lock is exclusive`() = runTest { + val locks = TransactionLocks() + val activeWrites = AtomicInteger(0) + + launch(Dispatchers.IO) { + locks.runWrite { + activeWrites.runWithIncremented { + Thread.sleep(50) + assertEquals(1, activeWrites.get()) + Thread.sleep(50) + } + } + } + + launch(Dispatchers.IO) { + locks.runWrite { + activeWrites.runWithIncremented { + Thread.sleep(50) + assertEquals(1, activeWrites.get()) + Thread.sleep(50) + } + } + } + } + + @Test + fun `read prevents writes`() = runTest { + val locks = TransactionLocks() + val activeWrites = AtomicInteger(0) + val activeReads = AtomicInteger(0) + + launch(Dispatchers.IO) { + locks.runRead { + activeReads.runWithIncremented { + Thread.sleep(50) + assertEquals(1, activeReads.get()) + assertEquals(0, activeWrites.get()) + Thread.sleep(50) + } + } + } + + launch(Dispatchers.IO) { + Thread.sleep(50) + locks.runWrite { + activeWrites.runWithIncremented { + assertEquals(0, activeReads.get()) + assertEquals(1, activeWrites.get()) + Thread.sleep(50) + } + } + } + } + + @Test + fun `multiple reads are allowed`() = runTest { + val locks = TransactionLocks() + val activeReads = AtomicInteger(0) + + launch(Dispatchers.IO) { + locks.runRead { + activeReads.runWithIncremented { + Thread.sleep(50) + assertEquals(2, activeReads.get()) + Thread.sleep(50) + } + } + } + + launch(Dispatchers.IO) { + locks.runRead { + activeReads.runWithIncremented { + Thread.sleep(50) + assertEquals(2, activeReads.get()) + Thread.sleep(50) + } + } + } + } + + @Test + fun `read inside read`() = runTest { + val locks = TransactionLocks() + + locks.runRead { + assertTrue(locks.canRead()) + assertFalse(locks.canWrite()) + locks.runRead { + assertTrue(locks.canRead()) + assertFalse(locks.canWrite()) + } + } + } + + @Test + fun `write inside write`() = runTest { + val locks = TransactionLocks() + + locks.runWrite { + assertTrue(locks.canWrite()) + locks.runWrite { + assertTrue(locks.canWrite()) + } + } + } +} + +private fun AtomicInteger.runWithIncremented(body: () -> Unit) { + incrementAndGet() + try { + body() + } finally { + decrementAndGet() + } +} diff --git a/model-server/src/test/kotlin/org/modelix/model/server/handlers/ModelReplicationServerTest.kt b/model-server/src/test/kotlin/org/modelix/model/server/handlers/ModelReplicationServerTest.kt index 74ad071fed..bba276d268 100644 --- a/model-server/src/test/kotlin/org/modelix/model/server/handlers/ModelReplicationServerTest.kt +++ b/model-server/src/test/kotlin/org/modelix/model/server/handlers/ModelReplicationServerTest.kt @@ -148,7 +148,9 @@ class ModelReplicationServerTest { val repositoryId = RepositoryId("repo1") runWithTestModelServer { _, fixture -> - fixture.repositoriesManager.createRepository(repositoryId, null) + fixture.repositoriesManager.getTransactionManager().runWrite { + fixture.repositoriesManager.createRepository(repositoryId, null) + } val response = client.delete { url { @@ -194,11 +196,13 @@ class ModelReplicationServerTest { val defaultBranchRef = repositoryId.getBranchReference("master") runWithTestModelServer { _, fixture -> - fixture.repositoriesManager.createRepository(repositoryId, null) - fixture.repositoriesManager.mergeChanges( - repositoryId.getBranchReference(branch), - checkNotNull(fixture.repositoriesManager.getVersionHash(defaultBranchRef)) { "Default branch must exist" }, - ) + fixture.repositoriesManager.getTransactionManager().runWrite { + fixture.repositoriesManager.createRepository(repositoryId, null) + fixture.repositoriesManager.mergeChanges( + repositoryId.getBranchReference(branch), + checkNotNull(fixture.repositoriesManager.getVersionHash(defaultBranchRef)) { "Default branch must exist" }, + ) + } val response = client.delete { url { @@ -207,7 +211,10 @@ class ModelReplicationServerTest { } assertEquals(HttpStatusCode.NoContent, response.status) - assertFalse(fixture.repositoriesManager.getBranchNames(repositoryId).contains(branch)) + val branchNames = fixture.repositoriesManager.getTransactionManager().runRead { + fixture.repositoriesManager.getBranchNames(repositoryId) + } + assertFalse(branchNames.contains(branch)) } } @@ -233,7 +240,9 @@ class ModelReplicationServerTest { modelReplicationServer, ), ) { _, _ -> - repositoriesManager.createRepository(repositoryId, null) + repositoriesManager.getTransactionManager().runWrite { + repositoriesManager.createRepository(repositoryId, null) + } // Act val response = client.get { @@ -270,7 +279,9 @@ class ModelReplicationServerTest { ) } } - repositoriesManager.createRepository(repositoryId, null) + repositoriesManager.getTransactionManager().runWrite { + repositoriesManager.createRepository(repositoryId, null) + } suspend fun createClient(server: NettyApplicationEngine): HttpClient { val port = server.resolvedConnectors().first().port @@ -378,7 +389,9 @@ class ModelReplicationServerTest { val repositoryId = RepositoryId("repo1") runWithTestModelServer { _, fixture -> - fixture.repositoriesManager.createRepository(repositoryId, null) + fixture.repositoriesManager.getTransactionManager().runWrite { + fixture.repositoriesManager.createRepository(repositoryId, null) + } val response = client.get { url { @@ -399,7 +412,9 @@ class ModelReplicationServerTest { ContentType.parse("application/x.modelix.branch+json;version=1").withCharset(Charsets.UTF_8) runWithTestModelServer { _, fixture -> - fixture.repositoriesManager.createRepository(repositoryId, null) + fixture.repositoriesManager.getTransactionManager().runWrite { + fixture.repositoriesManager.createRepository(repositoryId, null) + } val client = createClient { install(ContentNegotiation) { json() diff --git a/model-server/src/test/kotlin/org/modelix/model/server/handlers/RepositoriesManagerTest.kt b/model-server/src/test/kotlin/org/modelix/model/server/handlers/RepositoriesManagerTest.kt index b1e210ca80..b746849c21 100644 --- a/model-server/src/test/kotlin/org/modelix/model/server/handlers/RepositoriesManagerTest.kt +++ b/model-server/src/test/kotlin/org/modelix/model/server/handlers/RepositoriesManagerTest.kt @@ -4,8 +4,8 @@ import io.mockk.spyk import io.mockk.verify import kotlinx.coroutines.test.runTest import org.modelix.model.lazy.RepositoryId +import org.modelix.model.server.store.IRepositoryAwareStore import org.modelix.model.server.store.InMemoryStoreClient -import org.modelix.model.server.store.IsolatingStore import kotlin.test.AfterTest import kotlin.test.Test import kotlin.test.assertEquals @@ -13,10 +13,10 @@ import kotlin.test.assertTrue class RepositoriesManagerTest { - val store = spyk(InMemoryStoreClient()) + val store = spyk(InMemoryStoreClient()) private val repoManager = RepositoriesManager(store) - private suspend fun initRepository(repoId: RepositoryId) { + private fun initRepository(repoId: RepositoryId) { repoManager.createRepository(repoId, "testUser", useRoleIds = true, legacyGlobalStorage = false) } @@ -28,18 +28,21 @@ class RepositoriesManagerTest { @Test fun `deleting default branch works`() = runTest { val repoId = RepositoryId("branch-removal") - initRepository(repoId) - repoManager.removeBranches(repoId, setOf("master")) - val branches = repoManager.getBranches(repoId) - + repoManager.getTransactionManager().runWrite { + initRepository(repoId) + repoManager.removeBranches(repoId, setOf("master")) + } + val branches = repoManager.getTransactionManager().runRead { repoManager.getBranches(repoId) } assertTrue { branches.none { it.branchName == "master" } } } @Test fun `repository data is removed when removing repository`() = runTest { val repoId = RepositoryId("abc") - initRepository(repoId) - repoManager.removeRepository(repoId) + repoManager.getTransactionManager().runWrite { + initRepository(repoId) + repoManager.removeRepository(repoId) + } verify(exactly = 1) { store.removeRepositoryObjects(repoId) } } @@ -47,16 +50,18 @@ class RepositoriesManagerTest { fun `data of other repositories remains intact when removing a repository`() = runTest { val existingRepo = RepositoryId("existing") val toBeDeletedRepo = RepositoryId("tobedeleted") - initRepository(existingRepo) - initRepository(toBeDeletedRepo) + repoManager.getTransactionManager().runWrite { + initRepository(existingRepo) + initRepository(toBeDeletedRepo) - fun getExistingRepositoryData() = store.getAll().filterKeys { it.getRepositoryId() == existingRepo.id } + fun getExistingRepositoryData() = store.getAll().filterKeys { it.getRepositoryId() == existingRepo.id } - val dataBeforeDeletion = getExistingRepositoryData() - repoManager.removeRepository(toBeDeletedRepo) - val dataAfterDeletion = getExistingRepositoryData() + val dataBeforeDeletion = getExistingRepositoryData() + repoManager.removeRepository(toBeDeletedRepo) + val dataAfterDeletion = getExistingRepositoryData() - assertTrue(dataBeforeDeletion.isNotEmpty(), "Expected repository data was not found.") - assertEquals(dataBeforeDeletion, dataAfterDeletion, "Unexpected change in repository data.") + assertTrue(dataBeforeDeletion.isNotEmpty(), "Expected repository data was not found.") + assertEquals(dataBeforeDeletion, dataAfterDeletion, "Unexpected change in repository data.") + } } } diff --git a/model-server/src/test/kotlin/org/modelix/model/server/store/IgnitePostgresRepositoryRemovalTest.kt b/model-server/src/test/kotlin/org/modelix/model/server/store/IgnitePostgresRepositoryRemovalTest.kt index f9bd0e15e8..a3915ec38e 100644 --- a/model-server/src/test/kotlin/org/modelix/model/server/store/IgnitePostgresRepositoryRemovalTest.kt +++ b/model-server/src/test/kotlin/org/modelix/model/server/store/IgnitePostgresRepositoryRemovalTest.kt @@ -51,11 +51,11 @@ class IgnitePostgresRepositoryRemovalTest { ObjectInRepository(toDelete.id, "key0") to "value0", ObjectInRepository(toDelete.id, "key1") to "value1", ) - store.putAll(entries) + store.runWrite { store.putAll(entries) } - store.removeRepositoryObjects(toDelete) + store.runWrite { store.removeRepositoryObjects(toDelete) } - assertTrue { store.getAll(entries.keys).isEmpty() } + assertTrue { store.runRead { store.getAll(entries.keys) }.isEmpty() } } @Test @@ -67,7 +67,7 @@ class IgnitePostgresRepositoryRemovalTest { check(it.updateCount == 1) } - store.removeRepositoryObjects(toDelete) + store.getTransactionManager().runWrite { store.removeRepositoryObjects(toDelete) } dbConnection.prepareStatement("SELECT * FROM modelix.model WHERE repository = ?").use { it.setString(1, toDelete.id) @@ -88,7 +88,7 @@ class IgnitePostgresRepositoryRemovalTest { check(it.updateCount == 1) } - store.removeRepositoryObjects(toDelete) + store.runWrite { store.removeRepositoryObjects(toDelete) } dbConnection.prepareStatement("SELECT * FROM modelix.model WHERE repository = ?").use { it.setString(1, existing.id) @@ -103,22 +103,22 @@ class IgnitePostgresRepositoryRemovalTest { ObjectInRepository(existing.id, "key0") to "value0", ObjectInRepository(existing.id, "key1") to "value1", ) - store.putAll(existingEntries) + store.runWrite { store.putAll(existingEntries) } val toDeleteEntries = mapOf( ObjectInRepository(toDelete.id, "key0") to "value0", ObjectInRepository(toDelete.id, "key1") to "value1", ) - store.putAll(toDeleteEntries) - store.removeRepositoryObjects(toDelete) + store.runWrite { store.putAll(toDeleteEntries) } + store.runWrite { store.removeRepositoryObjects(toDelete) } - assertEquals(existingEntries, store.getAll()) + assertEquals(existingEntries, store.runRead { store.getAll() }) } @Test fun `removing a non-existing repository does not throw`() { assertDoesNotThrow { - store.removeRepositoryObjects(RepositoryId("invalid")) + store.runWrite { store.removeRepositoryObjects(RepositoryId("invalid")) } } } } diff --git a/model-server/src/test/kotlin/org/modelix/model/server/store/IgnitePostgresTest.kt b/model-server/src/test/kotlin/org/modelix/model/server/store/IgnitePostgresTest.kt index d89cf28e55..128239756a 100644 --- a/model-server/src/test/kotlin/org/modelix/model/server/store/IgnitePostgresTest.kt +++ b/model-server/src/test/kotlin/org/modelix/model/server/store/IgnitePostgresTest.kt @@ -43,93 +43,111 @@ class IgnitePostgresTest { @Test fun `can get values for multiple keys when Ignite has not cached the keys yet`() { - // The actual keys are irrelevant for this test. - // A fresh client will have no keys cached. - val keys = listOf("zK4Y2*xIEWlYlQGJL2Va4Z0ESgpWgnSQcOmnPeqt34PA", "zxgZN*oLuudxsu42ppSEGnCib8LkrSvauQk2B6T7AW6o") - .map { ObjectInRepository.global(it) } + store.runRead { + // The actual keys are irrelevant for this test. + // A fresh client will have no keys cached. + val keys = listOf("zK4Y2*xIEWlYlQGJL2Va4Z0ESgpWgnSQcOmnPeqt34PA", "zxgZN*oLuudxsu42ppSEGnCib8LkrSvauQk2B6T7AW6o") + .map { ObjectInRepository.global(it) } - val values = store.getAll(keys) + val values = store.getAll(keys) - assertTrue(values.filterNotNull().isNotEmpty()) + assertTrue(values.filterNotNull().isNotEmpty()) + } } @Test fun `store immutable object in repository`() { - val value = "immutable value " + System.nanoTime() - val hash = HashUtil.sha256(value) - val repositoryId = RepositoryId("repo1") - val key = ObjectInRepository(repositoryId.id, hash) - - assertEquals(null, store.get(key)) - store.put(key, value) - assertEquals(value, store.get(key)) - - assertEquals(null, store.get(ObjectInRepository.global(hash))) + store.runWrite { + val value = "immutable value " + System.nanoTime() + val hash = HashUtil.sha256(value) + val repositoryId = RepositoryId("repo1") + val key = ObjectInRepository(repositoryId.id, hash) + + assertEquals(null, store.get(key)) + store.put(key, value) + assertEquals(value, store.get(key)) + + assertEquals(null, store.get(ObjectInRepository.global(hash))) + } } @Test fun `store immutable object in global storage`() { - val value = "immutable value " + System.nanoTime() - val hash = HashUtil.sha256(value) - val key = ObjectInRepository.global(hash) - - assertEquals(null, store.get(key)) - store.put(key, value) - assertEquals(value, store.get(key)) + store.runWrite { + val value = "immutable value " + System.nanoTime() + val hash = HashUtil.sha256(value) + val key = ObjectInRepository.global(hash) + + assertEquals(null, store.get(key)) + store.put(key, value) + assertEquals(value, store.get(key)) + } } @Test fun `store mutable object in repository`() { - val value = "mutable value " + System.nanoTime() - val hash = "mutable key " + System.nanoTime() - val repositoryId = RepositoryId("repo1") - val key = ObjectInRepository(repositoryId.id, hash) - - assertEquals(null, store.get(key)) - store.put(key, value) - assertEquals(value, store.get(key)) - - assertEquals(null, store.get(ObjectInRepository.global(hash))) + store.runWrite { + val value = "mutable value " + System.nanoTime() + val hash = "mutable key " + System.nanoTime() + val repositoryId = RepositoryId("repo1") + val key = ObjectInRepository(repositoryId.id, hash) + + assertEquals(null, store.get(key)) + store.put(key, value) + assertEquals(value, store.get(key)) + + assertEquals(null, store.get(ObjectInRepository.global(hash))) + } } @Test fun `store mutable object in global storage`() { - val value = "mutable value " + System.nanoTime() - val hash = "mutable key " + System.nanoTime() - val key = ObjectInRepository.global(hash) - - assertEquals(null, store.get(key)) - store.put(key, value) - assertEquals(value, store.get(key)) + store.runWrite { + val value = "mutable value " + System.nanoTime() + val hash = "mutable key " + System.nanoTime() + val key = ObjectInRepository.global(hash) + + assertEquals(null, store.get(key)) + store.put(key, value) + assertEquals(value, store.get(key)) + } } @Test fun `read mutable legacy entry`() { - val key = ObjectInRepository.global(":v2:repositories") - assertEquals("courses", store.get(key)) + store.runRead { + val key = ObjectInRepository.global(":v2:repositories") + assertEquals("courses", store.get(key)) + } } @Test fun `overwrite mutable legacy entry`() { - val key = ObjectInRepository.global(":v2:repositories:courses:branches") - assertEquals("master", store.get(key)) - store.put(key, "new value") - assertEquals("new value", store.get(key)) + store.runWrite { + val key = ObjectInRepository.global(":v2:repositories:courses:branches") + assertEquals("master", store.get(key)) + store.put(key, "new value") + assertEquals("new value", store.get(key)) + } } @Test fun `delete overwritten mutable legacy entry`() { - val key = ObjectInRepository.global(":v2:repositories:courses:branches:master") - assertEquals("7fQeo*xrdfZuHZtaKhbp0OosarV5tVR8N3pW8JPkl7ZE", store.get(key)) - store.put(key, "new value") - assertEquals("new value", store.get(key)) - store.put(key, null) - assertEquals(null, store.get(key)) + store.runWrite { + val key = ObjectInRepository.global(":v2:repositories:courses:branches:master") + assertEquals("7fQeo*xrdfZuHZtaKhbp0OosarV5tVR8N3pW8JPkl7ZE", store.get(key)) + store.put(key, "new value") + assertEquals("new value", store.get(key)) + store.put(key, null) + assertEquals(null, store.get(key)) + } } @Test fun `read immutable legacy entry`() { - val key = ObjectInRepository.global("2YAqw*tJWjb2t2RMO8ypIvHn8QpHjwmIUdhdFygV9lUE") - assertEquals("S/5/0/W7HBQ*K6ZbUt76ti7r2pTscLVsWd8oiqIOIsKTpQB8Aw", store.get(key)) + store.runRead { + val key = ObjectInRepository.global("2YAqw*tJWjb2t2RMO8ypIvHn8QpHjwmIUdhdFygV9lUE") + assertEquals("S/5/0/W7HBQ*K6ZbUt76ti7r2pTscLVsWd8oiqIOIsKTpQB8Aw", store.get(key)) + } } } diff --git a/model-server/src/test/kotlin/org/modelix/model/server/store/InMemoryIsolatingStoreTest.kt b/model-server/src/test/kotlin/org/modelix/model/server/store/InMemoryIsolatingStoreTest.kt index ee2672058f..f57a7395f5 100644 --- a/model-server/src/test/kotlin/org/modelix/model/server/store/InMemoryIsolatingStoreTest.kt +++ b/model-server/src/test/kotlin/org/modelix/model/server/store/InMemoryIsolatingStoreTest.kt @@ -21,11 +21,13 @@ class InMemoryIsolatingStoreTest { ObjectInRepository(repoId.id, "key0") to "value0", ObjectInRepository(repoId.id, "key1") to "value1", ) - store.putAll(entries) + store.runWrite { + store.putAll(entries) - store.removeRepositoryObjects(repoId) + store.removeRepositoryObjects(repoId) + } - assertTrue { store.getAll().isEmpty() } + assertTrue { store.runRead { store.getAll() }.isEmpty() } } @Test @@ -35,23 +37,25 @@ class InMemoryIsolatingStoreTest { ObjectInRepository(existingId.id, "key0") to "value0", ObjectInRepository(existingId.id, "key1") to "value1", ) - store.putAll(existingEntries) + store.runWrite { store.putAll(existingEntries) } val toDeleteId = RepositoryId("toDelete") val toDeleteEntries = mapOf( ObjectInRepository(toDeleteId.id, "key0") to "value0", ObjectInRepository(toDeleteId.id, "key1") to "value1", ) - store.putAll(toDeleteEntries) - store.removeRepositoryObjects(toDeleteId) + store.runWrite { + store.putAll(toDeleteEntries) + store.removeRepositoryObjects(toDeleteId) + } - assertEquals(existingEntries, store.getAll()) + assertEquals(existingEntries, store.runRead { store.getAll() }) } @Test fun `removing a non-existing repository does not throw`() { assertDoesNotThrow { - store.removeRepositoryObjects(RepositoryId("invalid")) + store.runWrite { store.removeRepositoryObjects(RepositoryId("invalid")) } } } } diff --git a/streams/src/commonTest/kotlin/org/modelix/streams/StreamExtensionsTests.kt b/streams/src/commonTest/kotlin/org/modelix/streams/StreamExtensionsTests.kt new file mode 100644 index 0000000000..f60b074800 --- /dev/null +++ b/streams/src/commonTest/kotlin/org/modelix/streams/StreamExtensionsTests.kt @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2024. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.modelix.streams + +import com.badoo.reaktive.observable.observableOf +import com.badoo.reaktive.observable.toList +import kotlin.test.Test +import kotlin.test.assertEquals + +class StreamExtensionsTests { + + @Test + fun `distinct removes duplicates`() { + assertEquals( + listOf("g", "a", "d", "h", "z"), + observableOf("g", "g", "a", "d", "h", "z", "g", "h").distinct().toList().getSynchronous(), + ) + } +}