From 62112cf12857c5738c8c472eee99c9e9d63b6ab2 Mon Sep 17 00:00:00 2001 From: Sascha Lisson Date: Wed, 17 Apr 2024 20:54:38 +0200 Subject: [PATCH 1/3] feat(model-client): lazy loading support for IModelClientV2 --- .../kotlin/org/modelix/model/api/INode.kt | 6 + model-client/build.gradle.kts | 1 + .../modelix/model/client2/IModelClientV2.kt | 4 + .../modelix/model/client2/ModelClientV2.kt | 27 ++++- .../org/modelix/model/client2/LazyLoading.kt | 71 +++++++++++ .../modelix/model/lazy/ObjectStoreCache.kt | 5 +- .../specifications/model-server-v2.yaml | 12 ++ .../server/handlers/ModelReplicationServer.kt | 18 +++ .../modelix/model/server/LazyLoadingTest.kt | 114 ++++++++++++++++++ 9 files changed, 254 insertions(+), 4 deletions(-) create mode 100644 model-client/src/jvmMain/kotlin/org/modelix/model/client2/LazyLoading.kt create mode 100644 model-server/src/test/kotlin/org/modelix/model/server/LazyLoadingTest.kt diff --git a/model-api/src/commonMain/kotlin/org/modelix/model/api/INode.kt b/model-api/src/commonMain/kotlin/org/modelix/model/api/INode.kt index fc7e22d98b..660acfe113 100644 --- a/model-api/src/commonMain/kotlin/org/modelix/model/api/INode.kt +++ b/model-api/src/commonMain/kotlin/org/modelix/model/api/INode.kt @@ -314,6 +314,10 @@ interface IReplaceableNode : INode { @Deprecated("Use .key(INode), .key(IBranch), .key(ITransaction) or .key(ITree)") fun IRole.key(): String = RoleAccessContext.getKey(this) fun IRole.key(node: INode): String = if (node.usesRoleIds()) getUID() else getSimpleName() +fun IChildLink.key(node: INode): String? = when (this) { + is NullChildLink -> null + else -> (this as IRole).key(node) +} fun INode.usesRoleIds(): Boolean = if (this is INodeEx) this.usesRoleIds() else false fun INode.getChildren(link: IChildLink): Iterable = if (this is INodeEx) getChildren(link) else getChildren(link.key(this)) fun INode.moveChild(role: IChildLink, index: Int, child: INode): Unit = if (this is INodeEx) moveChild(role, index, child) else moveChild(role.key(this), index, child) @@ -433,3 +437,5 @@ fun INode.getContainmentLink() = if (this is INodeEx) { fun INode.getRoot(): INode = parent?.getRoot() ?: this fun INode.isInstanceOf(superConcept: IConcept?): Boolean = concept.let { it != null && it.isSubConceptOf(superConcept) } fun INode.isInstanceOfSafe(superConcept: IConcept): Boolean = tryGetConcept()?.isSubConceptOf(superConcept) ?: false + +fun INode.addNewChild(role: IChildLink, index: Int) = addNewChild(role, index, null as IConceptReference?) diff --git a/model-client/build.gradle.kts b/model-client/build.gradle.kts index 8622d8ba56..573f148895 100644 --- a/model-client/build.gradle.kts +++ b/model-client/build.gradle.kts @@ -21,6 +21,7 @@ val ktorVersion: String by rootProject val kotlinxSerializationVersion: String by rootProject kotlin { + jvmToolchain(11) jvm() js(IR) { browser { diff --git a/model-client/src/commonMain/kotlin/org/modelix/model/client2/IModelClientV2.kt b/model-client/src/commonMain/kotlin/org/modelix/model/client2/IModelClientV2.kt index 289b6fb3c3..ec474c0717 100644 --- a/model-client/src/commonMain/kotlin/org/modelix/model/client2/IModelClientV2.kt +++ b/model-client/src/commonMain/kotlin/org/modelix/model/client2/IModelClientV2.kt @@ -86,4 +86,8 @@ interface IModelClientV2 { suspend fun query(branch: BranchReference, body: (IMonoStep) -> IMonoStep): R suspend fun query(repositoryId: RepositoryId, versionHash: String, body: (IMonoStep) -> IMonoStep): R + + suspend fun getObjects(repository: RepositoryId, keys: Sequence): Map + + suspend fun pushObjects(repository: RepositoryId, objects: Sequence>) } diff --git a/model-client/src/commonMain/kotlin/org/modelix/model/client2/ModelClientV2.kt b/model-client/src/commonMain/kotlin/org/modelix/model/client2/ModelClientV2.kt index a8e9714b9f..7a441a0ff3 100644 --- a/model-client/src/commonMain/kotlin/org/modelix/model/client2/ModelClientV2.kt +++ b/model-client/src/commonMain/kotlin/org/modelix/model/client2/ModelClientV2.kt @@ -265,6 +265,29 @@ class ModelClientV2( } } + override suspend fun getObjects(repository: RepositoryId, keys: Sequence): Map { + val response = httpClient.post { + url { + takeFrom(baseUrl) + appendPathSegments("repositories", repository.id, "objects", "getAll") + } + setBody(keys.joinToString("\n")) + } + + val content = response.bodyAsChannel() + val objects = HashMap() + while (true) { + val key = checkNotNull(content.readUTF8Line()) { "Empty line expected at the end of the stream" } + if (key == "") { + check(content.readUTF8Line() == null) { "Empty line is only allowed at the end of the stream" } + break + } + val value = checkNotNull(content.readUTF8Line()) { "Object missing for hash $key" } + objects[key] = value + } + return objects + } + override suspend fun push(branch: BranchReference, version: IVersion, baseVersion: IVersion?): IVersion { LOG.debug { "${clientId.toString(16)}.push($branch, $version, $baseVersion)" } require(version is CLVersion) @@ -274,7 +297,7 @@ class ModelClientV2( HashUtil.checkObjectHashes(objects) val delta = if (objects.size > 1000) { // large HTTP requests and large Json objects don't scale well - uploadObjects(branch.repositoryId, objects.asSequence().map { it.key to it.value }) + pushObjects(branch.repositoryId, objects.asSequence().map { it.key to it.value }) VersionDelta(version.getContentHash(), null) } else { VersionDelta(version.getContentHash(), null, objectsMap = objects) @@ -292,7 +315,7 @@ class ModelClientV2( } } - private suspend fun uploadObjects(repository: RepositoryId, objects: Sequence>) { + override suspend fun pushObjects(repository: RepositoryId, objects: Sequence>) { LOG.debug { "${clientId.toString(16)}.pushObjects($repository)" } objects.chunked(100_000).forEach { unsortedChunk -> // Entries are sorted to avoid deadlocks on the server side between transactions. diff --git a/model-client/src/jvmMain/kotlin/org/modelix/model/client2/LazyLoading.kt b/model-client/src/jvmMain/kotlin/org/modelix/model/client2/LazyLoading.kt new file mode 100644 index 0000000000..831b26717c --- /dev/null +++ b/model-client/src/jvmMain/kotlin/org/modelix/model/client2/LazyLoading.kt @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2024. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.modelix.model.client2 + +import kotlinx.coroutines.runBlocking +import org.modelix.model.IKeyListener +import org.modelix.model.IKeyValueStore +import org.modelix.model.IVersion +import org.modelix.model.lazy.BranchReference +import org.modelix.model.lazy.CLVersion +import org.modelix.model.lazy.ObjectStoreCache +import org.modelix.model.lazy.RepositoryId + +fun IModelClientV2.lazyLoadVersion(repositoryId: RepositoryId, versionHash: String, cacheSize: Int = 100_000): IVersion { + val store = ObjectStoreCache(ModelClientAsStore(this, repositoryId), cacheSize) + return CLVersion.loadFromHash(versionHash, store) +} + +suspend fun IModelClientV2.lazyLoadVersion(branchRef: BranchReference, cacheSize: Int = 100_000): IVersion { + return lazyLoadVersion(branchRef.repositoryId, pullHash(branchRef), cacheSize) +} + +class ModelClientAsStore(val client: IModelClientV2, val repositoryId: RepositoryId) : IKeyValueStore { + override fun get(key: String): String? { + return getAll(listOf(key))[key] + } + + override fun put(key: String, value: String?) { + TODO("Not yet implemented") + } + + override fun getAll(keys: Iterable): Map { + return runBlocking { + client.getObjects(repositoryId, keys.asSequence()) + } + } + + override fun putAll(entries: Map) { + TODO("Not yet implemented") + } + + override fun prefetch(key: String) { + TODO("Not yet implemented") + } + + override fun listen(key: String, listener: IKeyListener) { + TODO("Not yet implemented") + } + + override fun removeListener(key: String, listener: IKeyListener) { + TODO("Not yet implemented") + } + + override fun getPendingSize(): Int { + TODO("Not yet implemented") + } +} diff --git a/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/ObjectStoreCache.kt b/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/ObjectStoreCache.kt index 9fe12d2b59..389eaf4988 100644 --- a/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/ObjectStoreCache.kt +++ b/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/ObjectStoreCache.kt @@ -17,9 +17,10 @@ package org.modelix.model.lazy import org.modelix.model.IKeyValueStore import org.modelix.model.createLRUMap +import kotlin.jvm.JvmOverloads -class ObjectStoreCache(override val keyValueStore: IKeyValueStore) : IDeserializingKeyValueStore { - private val cache: MutableMap = createLRUMap(100000) +class ObjectStoreCache @JvmOverloads constructor(override val keyValueStore: IKeyValueStore, cacheSize: Int = 100_000) : IDeserializingKeyValueStore { + private val cache: MutableMap = createLRUMap(cacheSize) override fun getAll(hashes_: Iterable, deserializer: (String, String) -> T): Iterable { val hashes = hashes_.toList() diff --git a/model-server-openapi/specifications/model-server-v2.yaml b/model-server-openapi/specifications/model-server-v2.yaml index 278c60ebd6..526cd94f83 100644 --- a/model-server-openapi/specifications/model-server-v2.yaml +++ b/model-server-openapi/specifications/model-server-v2.yaml @@ -75,6 +75,18 @@ paths: $ref: '#/components/responses/200json' default: $ref: '#/components/responses/GeneralError' + /repositories/{repository}/objects/getAll: + post: + operationId: postRepositoryObjectsGetAll + parameters: + - name: repository + in: "path" + required: true + schema: + type: string + responses: + "200": + $ref: '#/components/responses/200' /repositories/{repository}/branches: get: operationId: getRepositoryBranches diff --git a/model-server/src/main/kotlin/org/modelix/model/server/handlers/ModelReplicationServer.kt b/model-server/src/main/kotlin/org/modelix/model/server/handlers/ModelReplicationServer.kt index 65f419d429..4b51822244 100644 --- a/model-server/src/main/kotlin/org/modelix/model/server/handlers/ModelReplicationServer.kt +++ b/model-server/src/main/kotlin/org/modelix/model/server/handlers/ModelReplicationServer.kt @@ -25,6 +25,7 @@ import io.ktor.server.request.receiveStream import io.ktor.server.response.respond import io.ktor.server.response.respondBytesWriter import io.ktor.server.response.respondText +import io.ktor.server.response.respondTextWriter import io.ktor.server.routing.route import io.ktor.server.routing.routing import io.ktor.util.cio.use @@ -200,6 +201,23 @@ class ModelReplicationServer( } } + override suspend fun PipelineContext.postRepositoryObjectsGetAll(repository: String) { + val keys = call.receiveStream().bufferedReader().use { reader -> + reader.lineSequence().toHashSet() + } + val objects = withContext(Dispatchers.IO) { modelClient.store.getAll(keys) } + call.respondTextWriter(contentType = VersionDeltaStream.CONTENT_TYPE) { + objects.forEach { + append(it.key) + append("\n") + append(it.value) + append("\n") + } + // additional empty line indicates end of stream and can be used to verify completeness of data transfer + append("\n") + } + } + override suspend fun PipelineContext.pollRepositoryBranchHash( repository: String, branch: String, diff --git a/model-server/src/test/kotlin/org/modelix/model/server/LazyLoadingTest.kt b/model-server/src/test/kotlin/org/modelix/model/server/LazyLoadingTest.kt new file mode 100644 index 0000000000..70a515de2e --- /dev/null +++ b/model-server/src/test/kotlin/org/modelix/model/server/LazyLoadingTest.kt @@ -0,0 +1,114 @@ +/* + * Copyright (c) 2024. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.modelix.model.server + +import io.ktor.server.testing.ApplicationTestBuilder +import io.ktor.server.testing.testApplication +import org.modelix.authorization.installAuthentication +import org.modelix.model.api.INode +import org.modelix.model.api.NullChildLink +import org.modelix.model.api.TreePointer +import org.modelix.model.api.addNewChild +import org.modelix.model.api.getDescendants +import org.modelix.model.api.getRootNode +import org.modelix.model.client2.lazyLoadVersion +import org.modelix.model.client2.runWrite +import org.modelix.model.lazy.RepositoryId +import org.modelix.model.persistent.MapBasedStore +import org.modelix.model.server.api.v2.ObjectHash +import org.modelix.model.server.api.v2.SerializedObject +import org.modelix.model.server.handlers.IdsApiImpl +import org.modelix.model.server.handlers.ModelReplicationServer +import org.modelix.model.server.store.InMemoryStoreClient +import kotlin.test.Test +import kotlin.test.assertTrue + +class LazyLoadingTest { + + private lateinit var statistics: StoreClientWithStatistics + + private fun runTest(block: suspend ApplicationTestBuilder.() -> Unit) = testApplication { + application { + installAuthentication(unitTestMode = true) + installDefaultServerPlugins() + statistics = StoreClientWithStatistics(InMemoryStoreClient()) + ModelReplicationServer(statistics).init(this) + IdsApiImpl(statistics).init(this) + } + block() + } + + private fun assertRequestCount(atLeast: Long, body: () -> Unit): Long { + val requestCount = measureRequests(body) + assertTrue(requestCount >= atLeast, "At least $atLeast requests expected, but was $requestCount") + return requestCount + } + + private fun measureRequests(body: () -> Unit): Long { + val before = statistics.getTotalRequests() + body() + val after = statistics.getTotalRequests() + val requestCount = after - before + println("Requests: $requestCount") + return requestCount + } + + @Test + fun `model data is loaded on demand`() = runTest { + // After optimizing the lazy loading to send less (but bigger) requests, this test might fail. + // Just update the model size, cache size and expected request count to fix it. + + val client = createModelClient() + val branchRef = RepositoryId("my-repo").getBranchReference() + client.runWrite(branchRef) { + fun createNodes(parentNode: INode, numberOfNodes: Int) { + if (numberOfNodes == 0) return + if (numberOfNodes == 1) { + parentNode.addNewChild(NullChildLink, 0) + return + } + val subtreeSize1 = numberOfNodes / 2 + val subtreeSize2 = numberOfNodes - subtreeSize1 + createNodes(parentNode.addNewChild(NullChildLink, 0), subtreeSize1 - 1) + createNodes(parentNode.addNewChild(NullChildLink, 1), subtreeSize2 - 1) + } + + createNodes(it, 5_000) + } + val version = client.lazyLoadVersion(branchRef, cacheSize = 500) + + val rootNode = TreePointer(version.getTree()).getRootNode() + + // Traverse to the first leaf node. This should load some data, but not the whole model. + assertRequestCount(1) { + generateSequence(rootNode) { it.allChildren.firstOrNull() }.count() + } + + // Traverse the whole model. + val requestCountFirstTraversal = assertRequestCount(10) { + rootNode.getDescendants(true).count() + } + + // Traverse the whole model a second time. The model doesn't fit into the cache and some parts are already + // unloaded during the first traversal. The unloaded parts need to be requested again. + // But the navigation to the first leaf is like a warmup of the cache for the whole model traversal. + // The previous traversal can benefit from that, but the next one cannot and is expected to need more requests. + assertRequestCount(requestCountFirstTraversal + 1) { + rootNode.getDescendants(true).count() + } + } +} From ff4b503515f864c1f0bd64896e6a8b68510901fb Mon Sep 17 00:00:00 2001 From: Sascha Lisson Date: Fri, 10 May 2024 08:45:46 +0200 Subject: [PATCH 2/3] refactor(model-datastructure): renamed methods in IBulkQuery IBulkQuery is not public API, so this is not considered a breaking change. --- .../org/modelix/model/lazy/BulkQuery.kt | 42 +++++++++---------- .../kotlin/org/modelix/model/lazy/CLNode.kt | 4 +- .../kotlin/org/modelix/model/lazy/CLTree.kt | 40 +++++++++--------- .../org/modelix/model/lazy/CLVersion.kt | 12 +++--- .../org/modelix/model/lazy/IBulkQuery.kt | 12 +++--- .../org/modelix/model/lazy/NonBulkQuery.kt | 14 +++---- .../model/persistent/CPHamtInternal.kt | 26 ++++++------ .../modelix/model/persistent/CPHamtLeaf.kt | 2 +- .../modelix/model/persistent/CPHamtNode.kt | 4 +- .../modelix/model/persistent/CPHamtSingle.kt | 12 +++--- .../kotlin/org/modelix/model/InMemoryModel.kt | 14 +++---- .../server/handlers/RepositoriesManager.kt | 4 +- 12 files changed, 93 insertions(+), 93 deletions(-) diff --git a/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/BulkQuery.kt b/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/BulkQuery.kt index 52ac9cb7b7..a6134beb16 100644 --- a/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/BulkQuery.kt +++ b/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/BulkQuery.kt @@ -39,11 +39,11 @@ class BulkQuery(private val store: IDeserializingKeyValueStore) : IBulkQuery { } fun query(key: KVEntryReference, callback: (T) -> Unit) { - if (queue.size >= BATCH_SIZE && !processing) process() + if (queue.size >= BATCH_SIZE && !processing) executeQuery() queue.add(Pair(key as KVEntryReference, callback as (IKVValue?) -> Unit)) } - override fun get(hash: KVEntryReference): IBulkQuery.Value { + override fun query(hash: KVEntryReference): IBulkQuery.Value { val result = Value() query(hash) { value: T? -> result.success(value) } return result @@ -53,7 +53,7 @@ class BulkQuery(private val store: IDeserializingKeyValueStore) : IBulkQuery { return Value(value) } - override fun process() { + override fun executeQuery() { if (processing) { throw RuntimeException("Already processing") } @@ -83,22 +83,22 @@ class BulkQuery(private val store: IDeserializingKeyValueStore) : IBulkQuery { } } - override fun map(input_: Iterable, f: (I) -> IBulkQuery.Value): IBulkQuery.Value> { - val input = input_.toList() - if (input.isEmpty()) { + override fun flatMap(input: Iterable, f: (I) -> IBulkQuery.Value): IBulkQuery.Value> { + val inputList = input.toList() + if (inputList.isEmpty()) { return constant(emptyList()) } - val output = arrayOfNulls(input.size) - val done = BooleanArray(input.size) - var remaining = input.size + val output = arrayOfNulls(inputList.size) + val done = BooleanArray(inputList.size) + var remaining = inputList.size val result = Value>() - for (i_ in input.indices) { - f(input[i_]).onSuccess { value -> - if (done[i_]) { - return@onSuccess + for (i in inputList.indices) { + f(inputList[i]).onReceive { value -> + if (done[i]) { + return@onReceive } - output[i_] = value - done[i_] = true + output[i] = value + done[i] = true remaining-- if (remaining == 0) { result.success(output.map { e: Any? -> e as O }) @@ -131,7 +131,7 @@ class BulkQuery(private val store: IDeserializingKeyValueStore) : IBulkQuery { } @Synchronized - override fun onSuccess(handler: (T) -> Unit) { + override fun onReceive(handler: (T) -> Unit) { if (done) { handler(value as T) } else { @@ -139,8 +139,8 @@ class BulkQuery(private val store: IDeserializingKeyValueStore) : IBulkQuery { } } - override fun execute(): T { - process() + override fun executeQuery(): T { + this@BulkQuery.executeQuery() if (!done) { throw RuntimeException("No value received") } @@ -149,13 +149,13 @@ class BulkQuery(private val store: IDeserializingKeyValueStore) : IBulkQuery { override fun map(handler: (T) -> R): IBulkQuery.Value { val result = Value() - onSuccess { v -> result.success(handler(v)) } + onReceive { v -> result.success(handler(v)) } return result } - override fun mapBulk(handler: (T) -> IBulkQuery.Value): IBulkQuery.Value { + override fun flatMap(handler: (T) -> IBulkQuery.Value): IBulkQuery.Value { val result = Value() - onSuccess { v -> handler(v).onSuccess { value -> result.success(value) } } + onReceive { v -> handler(v).onReceive { value -> result.success(value) } } return result } } diff --git a/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/CLNode.kt b/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/CLNode.kt index 80a4a2b871..e3138dbd1a 100644 --- a/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/CLNode.kt +++ b/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/CLNode.kt @@ -82,9 +82,9 @@ class CLNode(private val tree: CLTree, private val data: CPNode) { getDescendants(bulkQuery, false) .map { descendants -> (sequenceOf(this) + descendants).asIterable() } } else { - getChildren(bulkQuery).mapBulk { children: Iterable -> + getChildren(bulkQuery).flatMap { children: Iterable -> val d: IBulkQuery.Value> = bulkQuery - .map(children) { child: CLNode -> child.getDescendants(bulkQuery, true) } + .flatMap(children) { child: CLNode -> child.getDescendants(bulkQuery, true) } .map { it.flatten() } d } diff --git a/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/CLTree.kt b/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/CLTree.kt index fda0e7f19c..902d7ac27f 100644 --- a/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/CLTree.kt +++ b/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/CLTree.kt @@ -92,7 +92,7 @@ class CLTree : ITree, IBulkTree { } fun getSize(): Long { - return (nodesMap ?: return 0L).calculateSize(store.newBulkQuery()).execute() + return (nodesMap ?: return 0L).calculateSize(store.newBulkQuery()).executeQuery() } fun prefetchAll() { @@ -315,35 +315,35 @@ class CLTree : ITree, IBulkTree { } override fun getAllChildren(parentId: Long): Iterable { - val children = getChildren(resolveElement(parentId)!!, store.newBulkQuery()).execute() + val children = getChildren(resolveElement(parentId)!!, store.newBulkQuery()).executeQuery() return children.map { it.id } } override fun getDescendants(root: Long, includeSelf: Boolean): Iterable { val parent = resolveElement(root) - return getDescendants(parent!!, store.newBulkQuery(), includeSelf).execute().map { CLNode(this, it) } + return getDescendants(parent!!, store.newBulkQuery(), includeSelf).executeQuery().map { CLNode(this, it) } } override fun getDescendants(rootIds: Iterable, includeSelf: Boolean): Iterable { val bulkQuery = store.newBulkQuery() val roots: IBulkQuery.Value> = resolveElements(rootIds.toList(), bulkQuery) - val descendants = roots.mapBulk { bulkQuery.map(it) { getDescendants(it, bulkQuery, includeSelf) } } - return descendants.execute().flatten().map { CLNode(this, it) } + val descendants = roots.flatMap { bulkQuery.flatMap(it) { getDescendants(it, bulkQuery, includeSelf) } } + return descendants.executeQuery().flatten().map { CLNode(this, it) } } override fun getAncestors(nodeIds: Iterable, includeSelf: Boolean): Set { val bulkQuery = store.newBulkQuery() val nodes: IBulkQuery.Value> = resolveElements(nodeIds, bulkQuery) - val ancestors = nodes.mapBulk { bulkQuery.map(it) { getAncestors(it, bulkQuery, includeSelf) } } + val ancestors = nodes.flatMap { bulkQuery.flatMap(it) { getAncestors(it, bulkQuery, includeSelf) } } val result = HashSet() - ancestors.execute().forEach { result.addAll(it.map { it.id }) } + ancestors.executeQuery().forEach { result.addAll(it.map { it.id }) } return result } override fun getChildren(parentId: Long, role: String?): Iterable { checkChildRoleId(parentId, role) val parent = resolveElement(parentId) - val children = getChildren(parent!!, store.newBulkQuery()).execute() + val children = getChildren(parent!!, store.newBulkQuery()).executeQuery() return children .filter { it.roleInParent == role } .map { it.id } @@ -351,7 +351,7 @@ class CLTree : ITree, IBulkTree { override fun getChildRoles(sourceId: Long): Iterable { val parent = resolveElement(sourceId) - val children: Iterable = getChildren(parent!!, store.newBulkQuery()).execute() + val children: Iterable = getChildren(parent!!, store.newBulkQuery()).executeQuery() return children.map { it.roleInParent }.distinct() } @@ -443,7 +443,7 @@ class CLTree : ITree, IBulkTree { override fun visitChanges(oldVersion: ITree, visitor: ITreeChangeVisitor) { val bulkQuery = store.newBulkQuery() visitChanges(oldVersion, visitor, bulkQuery) - (bulkQuery as? BulkQuery)?.process() + (bulkQuery as? BulkQuery)?.executeQuery() } fun visitChanges(oldVersion: ITree, visitor: ITreeChangeVisitor, bulkQuery: IBulkQuery) { @@ -512,7 +512,7 @@ class CLTree : ITree, IBulkTree { } } - bulkQuery.map(listOf(oldVersion.getChildren(oldElement, bulkQuery), getChildren(newElement, bulkQuery))) { it }.onSuccess { childrenLists -> + bulkQuery.flatMap(listOf(oldVersion.getChildren(oldElement, bulkQuery), getChildren(newElement, bulkQuery))) { it }.onReceive { childrenLists -> val (oldChildrenList, newChildrenList) = childrenLists val oldChildren: MutableMap> = HashMap() val newChildren: MutableMap> = HashMap() @@ -567,7 +567,7 @@ class CLTree : ITree, IBulkTree { } fun resolveElement(id: Long): CPNode? { - return resolveElement(id, NonBulkQuery(store)).execute() + return resolveElement(id, NonBulkQuery(store)).executeQuery() } fun resolveElement(id: Long, bulkQuery: IBulkQuery): IBulkQuery.Value { @@ -575,7 +575,7 @@ class CLTree : ITree, IBulkTree { return bulkQuery.constant(null) } val hash = nodesMap!!.get(id, bulkQuery) - return hash.mapBulk { + return hash.flatMap { if (it == null) throw NodeNotFoundException(id) createElement(it, bulkQuery) } @@ -587,24 +587,24 @@ class CLTree : ITree, IBulkTree { val b: IBulkQuery.Value>> = a.map { hashes: List?> -> hashes.mapIndexed { index, s -> s ?: throw NodeNotFoundException(ids[index]) } } - return b.mapBulk { hashes -> createElements(hashes, bulkQuery) } + return b.flatMap { hashes -> createElements(hashes, bulkQuery) } } fun createElement(hash: KVEntryReference?, query: IBulkQuery): IBulkQuery.Value { return if (hash == null) { query.constant(null) } else { - query[hash] + query.query(hash) } } fun createElement(hash: KVEntryReference?): CPNode? { - return createElement(hash, NonBulkQuery(store)).execute() + return createElement(hash, NonBulkQuery(store)).executeQuery() } fun createElements(hashes: List>, bulkQuery: IBulkQuery): IBulkQuery.Value> { - return bulkQuery.map(hashes) { hash: KVEntryReference -> - bulkQuery[hash].map { n -> n!! } + return bulkQuery.flatMap(hashes) { hash: KVEntryReference -> + bulkQuery.query(hash).map { n -> n!! } } } @@ -635,9 +635,9 @@ class CLTree : ITree, IBulkTree { getDescendants(node, bulkQuery, false) .map { descendants -> (sequenceOf(node) + descendants).asIterable() } } else { - getChildren(node, bulkQuery).mapBulk { children: Iterable -> + getChildren(node, bulkQuery).flatMap { children: Iterable -> val d: IBulkQuery.Value> = bulkQuery - .map(children) { child: CPNode -> getDescendants(child, bulkQuery, true) } + .flatMap(children) { child: CPNode -> getDescendants(child, bulkQuery, true) } .map { it.flatten() } d } diff --git a/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/CLVersion.kt b/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/CLVersion.kt index eee37b2420..f71a5b17c5 100644 --- a/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/CLVersion.kt +++ b/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/CLVersion.kt @@ -349,7 +349,7 @@ private fun computeDelta(keyValueStore: IKeyValueStore, versionHash: String, bas override fun visitChangesOnly(): Boolean = false override fun entryAdded(key: Long, value: KVEntryReference) { changedNodeIds += key - if (value != null) bulkQuery.get(value) + if (value != null) bulkQuery.query(value) } override fun entryRemoved(key: Long, value: KVEntryReference) { @@ -362,14 +362,14 @@ private fun computeDelta(keyValueStore: IKeyValueStore, versionHash: String, bas newValue: KVEntryReference, ) { changedNodeIds += key - if (newValue != null) bulkQuery.get(newValue) + if (newValue != null) bulkQuery.query(newValue) } }, bulkQuery, ) v1 = v2 } - (bulkQuery as? BulkQuery)?.process() + (bulkQuery as? BulkQuery)?.executeQuery() } val oldEntries: Map = trackAccessedEntries(keyValueStore) { store -> if (baseVersionHash == null) return@trackAccessedEntries @@ -386,12 +386,12 @@ private fun computeDelta(keyValueStore: IKeyValueStore, versionHash: String, bas val nodesMap = oldTree.nodesMap!! changedNodeIds.forEach { changedNodeId -> - nodesMap.get(changedNodeId, 0, bulkQuery).onSuccess { nodeRef: KVEntryReference? -> - if (nodeRef != null) bulkQuery.get(nodeRef) + nodesMap.get(changedNodeId, 0, bulkQuery).onReceive { nodeRef: KVEntryReference? -> + if (nodeRef != null) bulkQuery.query(nodeRef) } } - (bulkQuery as? BulkQuery)?.process() + (bulkQuery as? BulkQuery)?.executeQuery() } return oldAndNewEntries - oldEntries.keys } diff --git a/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/IBulkQuery.kt b/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/IBulkQuery.kt index d7e41ffd22..ce0e0e37a9 100644 --- a/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/IBulkQuery.kt +++ b/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/IBulkQuery.kt @@ -18,14 +18,14 @@ package org.modelix.model.lazy import org.modelix.model.persistent.IKVValue interface IBulkQuery { - fun process() - fun map(input_: Iterable, f: (I) -> Value): Value> + fun executeQuery() + fun flatMap(input: Iterable, f: (I) -> Value): Value> fun constant(value: T): Value - operator fun get(hash: KVEntryReference): Value + fun query(hash: KVEntryReference): Value interface Value { - fun execute(): T - fun mapBulk(handler: (T) -> Value): Value + fun executeQuery(): T + fun flatMap(handler: (T) -> Value): Value fun map(handler: (T) -> R): Value - fun onSuccess(handler: (T) -> Unit) + fun onReceive(handler: (T) -> Unit) } } diff --git a/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/NonBulkQuery.kt b/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/NonBulkQuery.kt index 10d358eaef..0d70318f9f 100644 --- a/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/NonBulkQuery.kt +++ b/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/NonBulkQuery.kt @@ -18,8 +18,8 @@ package org.modelix.model.lazy import org.modelix.model.persistent.IKVValue class NonBulkQuery(private val store: IDeserializingKeyValueStore) : IBulkQuery { - override fun map(input_: Iterable, f: (I) -> IBulkQuery.Value): IBulkQuery.Value> { - val list = input_.asSequence().map(f).map { it.execute() }.toList() + override fun flatMap(input: Iterable, f: (I) -> IBulkQuery.Value): IBulkQuery.Value> { + val list = input.asSequence().map(f).map { it.executeQuery() }.toList() return Value(list) } @@ -27,20 +27,20 @@ class NonBulkQuery(private val store: IDeserializingKeyValueStore) : IBulkQuery return Value(value) } - override fun get(hash: KVEntryReference): IBulkQuery.Value { + override fun query(hash: KVEntryReference): IBulkQuery.Value { return constant(hash.getValue(store)) } - override fun process() { + override fun executeQuery() { // all requests are processed immediately } class Value(private val value: T) : IBulkQuery.Value { - override fun execute(): T { + override fun executeQuery(): T { return value } - override fun mapBulk(handler: (T) -> IBulkQuery.Value): IBulkQuery.Value { + override fun flatMap(handler: (T) -> IBulkQuery.Value): IBulkQuery.Value { return handler(value) } @@ -48,7 +48,7 @@ class NonBulkQuery(private val store: IDeserializingKeyValueStore) : IBulkQuery return Value(handler(value)) } - override fun onSuccess(handler: (T) -> Unit) { + override fun onReceive(handler: (T) -> Unit) { handler(value) } } diff --git a/model-datastructure/src/commonMain/kotlin/org/modelix/model/persistent/CPHamtInternal.kt b/model-datastructure/src/commonMain/kotlin/org/modelix/model/persistent/CPHamtInternal.kt index 7b9e472228..ebc6b247fc 100644 --- a/model-datastructure/src/commonMain/kotlin/org/modelix/model/persistent/CPHamtInternal.kt +++ b/model-datastructure/src/commonMain/kotlin/org/modelix/model/persistent/CPHamtInternal.kt @@ -61,12 +61,12 @@ class CPHamtInternal( override fun calculateSize(bulkQuery: IBulkQuery): IBulkQuery.Value { val childRefs = data.children return bulkQuery - .map(childRefs.asIterable(), { bulkQuery.get(it) }) - .mapBulk { resolvedChildren: List -> + .flatMap(childRefs.asIterable(), { bulkQuery.query(it) }) + .flatMap { resolvedChildren: List -> val resolvedChildrenNN = resolvedChildren.mapIndexed { index, child -> child ?: throw RuntimeException("Entry not found in store: " + childRefs[index].getHash()) } - bulkQuery.map(resolvedChildrenNN) { it.calculateSize(bulkQuery) } + bulkQuery.flatMap(resolvedChildrenNN) { it.calculateSize(bulkQuery) } } .map { it.reduce { a, b -> a + b } } } @@ -74,7 +74,7 @@ class CPHamtInternal( override fun put(key: Long, value: KVEntryReference?, shift: Int, store: IDeserializingKeyValueStore): CPHamtNode? { require(shift <= CPHamtNode.MAX_SHIFT) { "$shift > ${CPHamtNode.MAX_SHIFT}" } val childIndex = CPHamtNode.indexFromKey(key, shift) - val child = getChild(childIndex, NonBulkQuery(store)).execute() + val child = getChild(childIndex, NonBulkQuery(store)).executeQuery() return if (child == null) { setChild(childIndex, CPHamtLeaf.create(key, value), shift, store) } else { @@ -85,7 +85,7 @@ class CPHamtInternal( override fun remove(key: Long, shift: Int, store: IDeserializingKeyValueStore): CPHamtNode? { require(shift <= CPHamtNode.MAX_SHIFT) { "$shift > ${CPHamtNode.MAX_SHIFT}" } val childIndex = CPHamtNode.indexFromKey(key, shift) - val child = getChild(childIndex, NonBulkQuery(store)).execute() + val child = getChild(childIndex, NonBulkQuery(store)).executeQuery() return if (child == null) { this } else { @@ -96,7 +96,7 @@ class CPHamtInternal( override fun get(key: Long, shift: Int, bulkQuery: IBulkQuery): IBulkQuery.Value?> { require(shift <= CPHamtNode.MAX_SHIFT) { "$shift > ${CPHamtNode.MAX_SHIFT}" } val childIndex = CPHamtNode.indexFromKey(key, shift) - return getChild(childIndex, bulkQuery).mapBulk { child: CPHamtNode? -> + return getChild(childIndex, bulkQuery).flatMap { child: CPHamtNode? -> if (child == null) { bulkQuery.constant(null) } else { @@ -116,18 +116,18 @@ class CPHamtInternal( } protected fun getChild(childHash: KVEntryReference, bulkQuery: IBulkQuery): IBulkQuery.Value { - return bulkQuery[childHash].map { childData -> + return bulkQuery.query(childHash).map { childData -> if (childData == null) throw RuntimeException("Entry not found in store: ${childHash.getHash()}") childData } } protected fun getChild(logicalIndex: Int, store: IDeserializingKeyValueStore): CPHamtNode? { - return getChild(logicalIndex, NonBulkQuery(store)).execute() + return getChild(logicalIndex, NonBulkQuery(store)).executeQuery() } protected fun getChild(childHash: KVEntryReference, store: IDeserializingKeyValueStore): CPHamtNode? { - return getChild(childHash, NonBulkQuery(store)).execute() + return getChild(childHash, NonBulkQuery(store)).executeQuery() } fun setChild(logicalIndex: Int, child: CPHamtNode?, shift: Int, store: IDeserializingKeyValueStore): CPHamtNode? { @@ -161,7 +161,7 @@ class CPHamtInternal( } val newChildren = COWArrays.removeAt(data.children, physicalIndex) if (newChildren.size == 1) { - val child0 = getChild(newChildren[0], NonBulkQuery(store)).execute() + val child0 = getChild(newChildren[0], NonBulkQuery(store)).executeQuery() if (child0 is CPHamtLeaf) { return child0 } @@ -170,8 +170,8 @@ class CPHamtInternal( } override fun visitEntries(bulkQuery: IBulkQuery, visitor: (Long, KVEntryReference) -> Unit): IBulkQuery.Value { - return bulkQuery.map(data.children.asIterable()) { bulkQuery.get(it) }.mapBulk { children -> - bulkQuery.map(children) { it!!.visitEntries(bulkQuery, visitor) }.map { } + return bulkQuery.flatMap(data.children.asIterable()) { bulkQuery.query(it) }.flatMap { children -> + bulkQuery.flatMap(children) { it!!.visitEntries(bulkQuery, visitor) }.map { } } } @@ -245,7 +245,7 @@ class CPHamtInternal( } else { visitor.entryAdded(k, v) } - }.onSuccess { + }.onReceive { entryVisitingDone = true if (!oldEntryExists) visitor.entryRemoved(oldNode.key, oldNode.value) } diff --git a/model-datastructure/src/commonMain/kotlin/org/modelix/model/persistent/CPHamtLeaf.kt b/model-datastructure/src/commonMain/kotlin/org/modelix/model/persistent/CPHamtLeaf.kt index aca031e577..76931ae96a 100644 --- a/model-datastructure/src/commonMain/kotlin/org/modelix/model/persistent/CPHamtLeaf.kt +++ b/model-datastructure/src/commonMain/kotlin/org/modelix/model/persistent/CPHamtLeaf.kt @@ -90,7 +90,7 @@ class CPHamtLeaf( visitor.entryRemoved(k, v) } } - oldNode!!.visitEntries(bulkQuery, bp).onSuccess { + oldNode!!.visitEntries(bulkQuery, bp).onReceive { val oldValue = oldValue if (oldValue == null) { visitor.entryAdded(key, value) diff --git a/model-datastructure/src/commonMain/kotlin/org/modelix/model/persistent/CPHamtNode.kt b/model-datastructure/src/commonMain/kotlin/org/modelix/model/persistent/CPHamtNode.kt index 7fa15c9402..ae2b50e108 100644 --- a/model-datastructure/src/commonMain/kotlin/org/modelix/model/persistent/CPHamtNode.kt +++ b/model-datastructure/src/commonMain/kotlin/org/modelix/model/persistent/CPHamtNode.kt @@ -41,11 +41,11 @@ abstract class CPHamtNode : IKVValue { fun get(key: Long, store: IDeserializingKeyValueStore): KVEntryReference? { val bulkQuery: IBulkQuery = NonBulkQuery(store) - return get(key, 0, bulkQuery).execute() + return get(key, 0, bulkQuery).executeQuery() } fun getAll(keys: Iterable, bulkQuery: IBulkQuery): IBulkQuery.Value?>> { - return bulkQuery.map(keys) { key: Long -> get(key, 0, bulkQuery) } + return bulkQuery.flatMap(keys) { key: Long -> get(key, 0, bulkQuery) } } fun put(key: Long, value: KVEntryReference?, store: IDeserializingKeyValueStore): CPHamtNode? { diff --git a/model-datastructure/src/commonMain/kotlin/org/modelix/model/persistent/CPHamtSingle.kt b/model-datastructure/src/commonMain/kotlin/org/modelix/model/persistent/CPHamtSingle.kt index 56b8e0ac88..66dec7fdd8 100644 --- a/model-datastructure/src/commonMain/kotlin/org/modelix/model/persistent/CPHamtSingle.kt +++ b/model-datastructure/src/commonMain/kotlin/org/modelix/model/persistent/CPHamtSingle.kt @@ -45,7 +45,7 @@ class CPHamtSingle( } override fun calculateSize(bulkQuery: IBulkQuery): IBulkQuery.Value { - return getChild(bulkQuery).mapBulk { it.calculateSize(bulkQuery) } + return getChild(bulkQuery).flatMap { it.calculateSize(bulkQuery) } } private fun maskBits(key: Long, shift: Int): Long = (key ushr (CPHamtNode.MAX_BITS - CPHamtNode.BITS_PER_LEVEL * numLevels - shift)) and mask @@ -53,8 +53,8 @@ class CPHamtSingle( override fun get(key: Long, shift: Int, bulkQuery: IBulkQuery): IBulkQuery.Value?> { require(shift <= CPHamtNode.MAX_SHIFT) { "$shift > ${CPHamtNode.MAX_SHIFT}" } if (maskBits(key, shift) == bits) { - return bulkQuery.get(child) - .mapBulk { + return bulkQuery.query(child) + .flatMap { val childData = it ?: throw RuntimeException("Entry not found in store: " + child.getHash()) childData.get(key, shift + numLevels * CPHamtNode.BITS_PER_LEVEL, bulkQuery) } @@ -66,7 +66,7 @@ class CPHamtSingle( override fun put(key: Long, value: KVEntryReference?, shift: Int, store: IDeserializingKeyValueStore): CPHamtNode? { require(shift <= CPHamtNode.MAX_SHIFT) { "$shift > ${CPHamtNode.MAX_SHIFT}" } if (maskBits(key, shift) == bits) { - return withNewChild(getChild(NonBulkQuery(store)).execute().put(key, value, shift + CPHamtNode.BITS_PER_LEVEL * numLevels, store)) + return withNewChild(getChild(NonBulkQuery(store)).executeQuery().put(key, value, shift + CPHamtNode.BITS_PER_LEVEL * numLevels, store)) } else { if (numLevels > 1) { return splitOneLevel().put(key, value, shift, store) @@ -109,11 +109,11 @@ class CPHamtSingle( } fun getChild(bulkQuery: IBulkQuery): IBulkQuery.Value { - return bulkQuery[child].map { childData -> childData!! } + return bulkQuery.query(child).map { childData -> childData!! } } override fun visitEntries(bulkQuery: IBulkQuery, visitor: (Long, KVEntryReference) -> Unit): IBulkQuery.Value { - return getChild(bulkQuery).mapBulk { it.visitEntries(bulkQuery, visitor) } + return getChild(bulkQuery).flatMap { it.visitEntries(bulkQuery, visitor) } } override fun visitChanges(oldNode: CPHamtNode?, shift: Int, visitor: CPHamtNode.IChangeVisitor, bulkQuery: IBulkQuery) { diff --git a/model-datastructure/src/jvmMain/kotlin/org/modelix/model/InMemoryModel.kt b/model-datastructure/src/jvmMain/kotlin/org/modelix/model/InMemoryModel.kt index 84bcdb4261..b6a32a3798 100644 --- a/model-datastructure/src/jvmMain/kotlin/org/modelix/model/InMemoryModel.kt +++ b/model-datastructure/src/jvmMain/kotlin/org/modelix/model/InMemoryModel.kt @@ -135,16 +135,16 @@ class InMemoryModel private constructor( val bulkQuery = NonCachingObjectStore(store).newBulkQuery() LOG.info { "Start loading model into memory" } val duration = measureTimeMillis { - bulkQuery.get(slowMapRef).onSuccess { slowMap -> + bulkQuery.query(slowMapRef).onReceive { slowMap -> slowMap!!.visitEntries(bulkQuery) { nodeId, nodeDataRef -> - bulkQuery.get(nodeDataRef).onSuccess { nodeData -> + bulkQuery.query(nodeDataRef).onReceive { nodeData -> if (nodeData != null) { fastMap.put(nodeId, nodeData) } } } } - bulkQuery.process() + bulkQuery.executeQuery() }.milliseconds LOG.info { "Done loading model into memory after ${duration.toDouble(DurationUnit.SECONDS)} s" } return InMemoryModel(branchId, slowMapRef, fastMap, useRoleIds) @@ -162,7 +162,7 @@ class InMemoryModel private constructor( LOG.debug { "Model update started" } fastMap.putAll(nodeMap) val duration = measureTimeMillis { - bulkQuery.map(listOf(slowMapRef, loadedMapRef)) { bulkQuery.get(it) }.onSuccess { + bulkQuery.flatMap(listOf(slowMapRef, loadedMapRef)) { bulkQuery.query(it) }.onReceive { val newSlowMap = it[0]!! val oldSlowMap = it[1]!! newSlowMap.visitChanges( @@ -170,7 +170,7 @@ class InMemoryModel private constructor( object : CPHamtNode.IChangeVisitor { override fun visitChangesOnly(): Boolean = false override fun entryAdded(key: Long, value: KVEntryReference) { - bulkQuery.get(value).onSuccess { nodeData -> + bulkQuery.query(value).onReceive { nodeData -> if (nodeData != null) { fastMap.put(key, nodeData) } @@ -184,7 +184,7 @@ class InMemoryModel private constructor( oldValue: KVEntryReference, newValue: KVEntryReference, ) { - bulkQuery.get(newValue).onSuccess { nodeData -> + bulkQuery.query(newValue).onReceive { nodeData -> if (nodeData != null) { fastMap.put(key, nodeData) } @@ -194,7 +194,7 @@ class InMemoryModel private constructor( bulkQuery, ) } - bulkQuery.process() + bulkQuery.executeQuery() }.milliseconds LOG.info { "Done updating model after ${duration.toDouble(DurationUnit.SECONDS)} s" } return InMemoryModel(branchId, slowMapRef, fastMap, useRoleIds) diff --git a/model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoriesManager.kt b/model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoriesManager.kt index 61fdc04c76..16c0c44908 100644 --- a/model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoriesManager.kt +++ b/model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoriesManager.kt @@ -372,7 +372,7 @@ class RepositoriesManager(val client: LocalModelClient) : IRepositoriesManager { fun emitObjects(entry: KVEntryReference<*>) { if (seenHashes.contains(entry.getHash())) return seenHashes.add(entry.getHash()) - bulkQuery.get(entry).onSuccess { + bulkQuery.query(entry).onReceive { val value = checkNotNull(it) { "No value received for ${entry.getHash()}" } // Use `send` instead of `trySend`, // because `trySend` fails if the channel capacity is full. @@ -391,7 +391,7 @@ class RepositoriesManager(val client: LocalModelClient) : IRepositoriesManager { } emitObjects(KVEntryReference(versionHash, CPVersion.DESERIALIZER)) LOG.debug("Starting to bulk query all objects.") - bulkQuery.process() + bulkQuery.executeQuery() } } val checkedHashObjectFlow = hashObjectFlow.checkObjectHashes() From 65ca207e02fa80c40394f05e0a27c551ed90b985 Mon Sep 17 00:00:00 2001 From: Sascha Lisson Date: Tue, 28 May 2024 09:50:25 +0200 Subject: [PATCH 3/3] perf(model-client): prefetch data to reduce the number of small requests --- .../model/client/GarbageFilteringStore.kt | 4 + .../modelix/model/client2/IModelClientV2.kt | 4 - .../model/client2/IModelClientV2Internal.kt | 39 +++ .../modelix/model/client2/ModelClientV2.kt | 27 +- .../org/modelix/model/KeyValueStoreCache.kt | 4 + .../org/modelix/model/client/AsyncStore.kt | 6 + .../model/client/RestWebModelClient.kt | 4 + .../org/modelix/model/client2/LazyLoading.kt | 173 +++++++------ .../org/modelix/model/IKeyValueStore.kt | 4 +- .../org/modelix/model/lazy/BulkQuery.kt | 236 ++++++++++++++---- .../kotlin/org/modelix/model/lazy/CLTree.kt | 47 +++- .../org/modelix/model/lazy/CLVersion.kt | 16 +- .../org/modelix/model/lazy/IBulkQuery.kt | 19 ++ .../model/lazy/IDeserializingKeyValueStore.kt | 7 +- .../modelix/model/lazy/IndirectObjectStore.kt | 4 + .../org/modelix/model/lazy/NonBulkQuery.kt | 4 + .../model/lazy/NonCachingObjectStore.kt | 19 ++ .../modelix/model/lazy/ObjectStoreCache.kt | 131 +++++++++- .../org/modelix/model/lazy/PrefetchCache.kt | 11 +- .../model/lazy/SynchronizedBulkQuery.kt | 78 ++++++ .../modelix/model/persistent/CPHamtSingle.kt | 2 +- .../modelix/model/persistent/MapBasedStore.kt | 7 +- .../server/api/v2/ImmutableObjectsStream.kt | 58 +++++ .../server/handlers/ModelReplicationServer.kt | 29 ++- .../model/server/store/LocalModelClient.kt | 4 + .../modelix/model/server/LazyLoadingTest.kt | 220 ++++++++++++---- 26 files changed, 937 insertions(+), 220 deletions(-) create mode 100644 model-client/src/commonMain/kotlin/org/modelix/model/client2/IModelClientV2Internal.kt create mode 100644 model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/SynchronizedBulkQuery.kt create mode 100644 model-server-api/src/commonMain/kotlin/org/modelix/model/server/api/v2/ImmutableObjectsStream.kt diff --git a/model-client/src/commonMain/kotlin/org/modelix/model/client/GarbageFilteringStore.kt b/model-client/src/commonMain/kotlin/org/modelix/model/client/GarbageFilteringStore.kt index d702bf280c..37abe85307 100644 --- a/model-client/src/commonMain/kotlin/org/modelix/model/client/GarbageFilteringStore.kt +++ b/model-client/src/commonMain/kotlin/org/modelix/model/client/GarbageFilteringStore.kt @@ -29,6 +29,10 @@ class GarbageFilteringStore(private val store: IKeyValueStore) : IKeyValueStoreW return if (pendingEntries.containsKey(key)) pendingEntries[key] else store[key] } + override fun getIfCached(key: String): String? { + return if (pendingEntries.containsKey(key)) pendingEntries[key] else store.getIfCached(key) + } + override fun getPendingSize(): Int = store.getPendingSize() + pendingEntries.size override fun getWrapped(): IKeyValueStore = store diff --git a/model-client/src/commonMain/kotlin/org/modelix/model/client2/IModelClientV2.kt b/model-client/src/commonMain/kotlin/org/modelix/model/client2/IModelClientV2.kt index ec474c0717..289b6fb3c3 100644 --- a/model-client/src/commonMain/kotlin/org/modelix/model/client2/IModelClientV2.kt +++ b/model-client/src/commonMain/kotlin/org/modelix/model/client2/IModelClientV2.kt @@ -86,8 +86,4 @@ interface IModelClientV2 { suspend fun query(branch: BranchReference, body: (IMonoStep) -> IMonoStep): R suspend fun query(repositoryId: RepositoryId, versionHash: String, body: (IMonoStep) -> IMonoStep): R - - suspend fun getObjects(repository: RepositoryId, keys: Sequence): Map - - suspend fun pushObjects(repository: RepositoryId, objects: Sequence>) } diff --git a/model-client/src/commonMain/kotlin/org/modelix/model/client2/IModelClientV2Internal.kt b/model-client/src/commonMain/kotlin/org/modelix/model/client2/IModelClientV2Internal.kt new file mode 100644 index 0000000000..a04f7fa98b --- /dev/null +++ b/model-client/src/commonMain/kotlin/org/modelix/model/client2/IModelClientV2Internal.kt @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2024. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.modelix.model.client2 + +import org.modelix.model.lazy.RepositoryId +import org.modelix.model.server.api.v2.ObjectHash +import org.modelix.model.server.api.v2.ObjectHashAndSerializedObject +import org.modelix.model.server.api.v2.SerializedObject + +/** + * Should only be used by Modelix components. + */ +interface IModelClientV2Internal : IModelClientV2 { + /** + * Required for lazy loading. + * Use [IModelClientV2.lazyLoadVersion] + */ + suspend fun getObjects(repository: RepositoryId, keys: Sequence): Map + + /** + * Required for lazy loading. + * Use [IModelClientV2.lazyLoadVersion] + */ + suspend fun pushObjects(repository: RepositoryId, objects: Sequence) +} diff --git a/model-client/src/commonMain/kotlin/org/modelix/model/client2/ModelClientV2.kt b/model-client/src/commonMain/kotlin/org/modelix/model/client2/ModelClientV2.kt index 7a441a0ff3..70bad660fd 100644 --- a/model-client/src/commonMain/kotlin/org/modelix/model/client2/ModelClientV2.kt +++ b/model-client/src/commonMain/kotlin/org/modelix/model/client2/ModelClientV2.kt @@ -68,6 +68,10 @@ import org.modelix.model.lazy.computeDelta import org.modelix.model.operations.OTBranch import org.modelix.model.persistent.HashUtil import org.modelix.model.persistent.MapBasedStore +import org.modelix.model.server.api.v2.ImmutableObjectsStream +import org.modelix.model.server.api.v2.ObjectHash +import org.modelix.model.server.api.v2.ObjectHashAndSerializedObject +import org.modelix.model.server.api.v2.SerializedObject import org.modelix.model.server.api.v2.VersionDelta import org.modelix.model.server.api.v2.VersionDeltaStream import org.modelix.model.server.api.v2.VersionDeltaStreamV2 @@ -83,7 +87,7 @@ class ModelClientV2( private val httpClient: HttpClient, val baseUrl: String, private var clientProvidedUserId: String?, -) : IModelClientV2, Closable { +) : IModelClientV2, IModelClientV2Internal, Closable { private var clientId: Int = 0 private var idGenerator: IIdGenerator = IdGeneratorDummy() private var serverProvidedUserId: String? = null @@ -265,27 +269,16 @@ class ModelClientV2( } } - override suspend fun getObjects(repository: RepositoryId, keys: Sequence): Map { - val response = httpClient.post { + override suspend fun getObjects(repository: RepositoryId, keys: Sequence): Map { + return httpClient.preparePost { url { takeFrom(baseUrl) appendPathSegments("repositories", repository.id, "objects", "getAll") } setBody(keys.joinToString("\n")) + }.execute { response -> + ImmutableObjectsStream.decode(response.bodyAsChannel()) } - - val content = response.bodyAsChannel() - val objects = HashMap() - while (true) { - val key = checkNotNull(content.readUTF8Line()) { "Empty line expected at the end of the stream" } - if (key == "") { - check(content.readUTF8Line() == null) { "Empty line is only allowed at the end of the stream" } - break - } - val value = checkNotNull(content.readUTF8Line()) { "Object missing for hash $key" } - objects[key] = value - } - return objects } override suspend fun push(branch: BranchReference, version: IVersion, baseVersion: IVersion?): IVersion { @@ -315,7 +308,7 @@ class ModelClientV2( } } - override suspend fun pushObjects(repository: RepositoryId, objects: Sequence>) { + override suspend fun pushObjects(repository: RepositoryId, objects: Sequence) { LOG.debug { "${clientId.toString(16)}.pushObjects($repository)" } objects.chunked(100_000).forEach { unsortedChunk -> // Entries are sorted to avoid deadlocks on the server side between transactions. diff --git a/model-client/src/jvmMain/kotlin/org/modelix/model/KeyValueStoreCache.kt b/model-client/src/jvmMain/kotlin/org/modelix/model/KeyValueStoreCache.kt index a4773adb5a..fc831d65b6 100644 --- a/model-client/src/jvmMain/kotlin/org/modelix/model/KeyValueStoreCache.kt +++ b/model-client/src/jvmMain/kotlin/org/modelix/model/KeyValueStoreCache.kt @@ -61,6 +61,10 @@ class KeyValueStoreCache(private val store: IKeyValueStore) : IKeyValueStoreWrap return getAll(setOf(key))[key] } + override fun getIfCached(key: String): String? { + return cache[key] ?: store.getIfCached(key) + } + override fun getAll(keys: Iterable): Map { val remainingKeys = toStream(keys).collect(Collectors.toList()) val result: MutableMap = LinkedHashMap(16, 0.75.toFloat(), false) diff --git a/model-client/src/jvmMain/kotlin/org/modelix/model/client/AsyncStore.kt b/model-client/src/jvmMain/kotlin/org/modelix/model/client/AsyncStore.kt index 399b657002..cee7eca743 100644 --- a/model-client/src/jvmMain/kotlin/org/modelix/model/client/AsyncStore.kt +++ b/model-client/src/jvmMain/kotlin/org/modelix/model/client/AsyncStore.kt @@ -33,6 +33,12 @@ class AsyncStore(private val store: IKeyValueStore) : IKeyValueStoreWrapper { return store[key] } + override fun getIfCached(key: String): String? { + return synchronized(pendingWrites) { + pendingWrites[key] + } ?: store.getIfCached(key) + } + override fun getWrapped(): IKeyValueStore = store override fun getPendingSize(): Int = store.getPendingSize() + pendingWrites.size diff --git a/model-client/src/jvmMain/kotlin/org/modelix/model/client/RestWebModelClient.kt b/model-client/src/jvmMain/kotlin/org/modelix/model/client/RestWebModelClient.kt index bc3010d44f..0731f2fdb6 100644 --- a/model-client/src/jvmMain/kotlin/org/modelix/model/client/RestWebModelClient.kt +++ b/model-client/src/jvmMain/kotlin/org/modelix/model/client/RestWebModelClient.kt @@ -371,6 +371,10 @@ class RestWebModelClient @JvmOverloads constructor( return runBlocking { getA(key) } } + override fun getIfCached(key: String): String? { + return null // doesn't contain any caches + } + override suspend fun getA(key: String): String? { val isHash = HashUtil.isSha256(key) if (isHash) { diff --git a/model-client/src/jvmMain/kotlin/org/modelix/model/client2/LazyLoading.kt b/model-client/src/jvmMain/kotlin/org/modelix/model/client2/LazyLoading.kt index 831b26717c..2cae1f842e 100644 --- a/model-client/src/jvmMain/kotlin/org/modelix/model/client2/LazyLoading.kt +++ b/model-client/src/jvmMain/kotlin/org/modelix/model/client2/LazyLoading.kt @@ -1,71 +1,102 @@ -/* - * Copyright (c) 2024. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.modelix.model.client2 - -import kotlinx.coroutines.runBlocking -import org.modelix.model.IKeyListener -import org.modelix.model.IKeyValueStore -import org.modelix.model.IVersion -import org.modelix.model.lazy.BranchReference -import org.modelix.model.lazy.CLVersion -import org.modelix.model.lazy.ObjectStoreCache -import org.modelix.model.lazy.RepositoryId - -fun IModelClientV2.lazyLoadVersion(repositoryId: RepositoryId, versionHash: String, cacheSize: Int = 100_000): IVersion { - val store = ObjectStoreCache(ModelClientAsStore(this, repositoryId), cacheSize) - return CLVersion.loadFromHash(versionHash, store) -} - -suspend fun IModelClientV2.lazyLoadVersion(branchRef: BranchReference, cacheSize: Int = 100_000): IVersion { - return lazyLoadVersion(branchRef.repositoryId, pullHash(branchRef), cacheSize) -} - -class ModelClientAsStore(val client: IModelClientV2, val repositoryId: RepositoryId) : IKeyValueStore { - override fun get(key: String): String? { - return getAll(listOf(key))[key] - } - - override fun put(key: String, value: String?) { - TODO("Not yet implemented") - } - - override fun getAll(keys: Iterable): Map { - return runBlocking { - client.getObjects(repositoryId, keys.asSequence()) - } - } - - override fun putAll(entries: Map) { - TODO("Not yet implemented") - } - - override fun prefetch(key: String) { - TODO("Not yet implemented") - } - - override fun listen(key: String, listener: IKeyListener) { - TODO("Not yet implemented") - } - - override fun removeListener(key: String, listener: IKeyListener) { - TODO("Not yet implemented") - } - - override fun getPendingSize(): Int { - TODO("Not yet implemented") - } -} +/* + * Copyright (c) 2024. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.modelix.model.client2 + +import kotlinx.coroutines.runBlocking +import org.modelix.model.IKeyListener +import org.modelix.model.IKeyValueStore +import org.modelix.model.IVersion +import org.modelix.model.lazy.BranchReference +import org.modelix.model.lazy.CLVersion +import org.modelix.model.lazy.CacheConfiguration +import org.modelix.model.lazy.ObjectStoreCache +import org.modelix.model.lazy.RepositoryId +import org.modelix.model.persistent.HashUtil + +/** + * This function loads parts of the model lazily while it is iterated and limits the amount of data that is cached on + * the client side. + * + * IModelClientV2#loadVersion eagerly loads the whole model. For large models this can be slow and requires lots of + * memory. + * To reduce the relative overhead of requests to the server, the lazy loading algorithm tries to predict which nodes + * are required next and fill a "prefetch cache" by using "free capacity" of the regular requests. That means, + * the number of requests doesn't change by this prefetching, but small requests are filled to up to their limit with + * additional prefetch requests. + */ +fun IModelClientV2.lazyLoadVersion(repositoryId: RepositoryId, versionHash: String, config: CacheConfiguration = CacheConfiguration()): IVersion { + val store = ObjectStoreCache(ModelClientAsStore(this, repositoryId), config) + return CLVersion.loadFromHash(versionHash, store) +} + +/** + * An overload of [IModelClientV2.lazyLoadVersion] that reads the current version hash of the branch from the server and + * then loads that version with lazy loading support. + */ +suspend fun IModelClientV2.lazyLoadVersion(branchRef: BranchReference, config: CacheConfiguration = CacheConfiguration()): IVersion { + return lazyLoadVersion(branchRef.repositoryId, pullHash(branchRef), config) +} + +private class ModelClientAsStore(client: IModelClientV2, val repositoryId: RepositoryId) : IKeyValueStore { + private val client: IModelClientV2Internal = client as IModelClientV2Internal + + override fun get(key: String): String? { + return getAll(listOf(key))[key] + } + + override fun getIfCached(key: String): String? { + return null + } + + override fun put(key: String, value: String?) { + putAll(mapOf(key to value)) + } + + override fun getAll(keys: Iterable): Map { + return runBlocking { + client.getObjects(repositoryId, keys.asSequence()) + } + } + + override fun putAll(entries: Map) { + runBlocking { + client.pushObjects( + repositoryId, + entries.asSequence().map { (key, value) -> + require(HashUtil.isSha256(key) && value != null) { "Only immutable objects are allowed: $key -> $value" } + key to value + }, + ) + } + } + + override fun prefetch(key: String) { + throw UnsupportedOperationException() + } + + override fun listen(key: String, listener: IKeyListener) { + throw UnsupportedOperationException() + } + + override fun removeListener(key: String, listener: IKeyListener) { + throw UnsupportedOperationException() + } + + override fun getPendingSize(): Int { + return 0 + } +} diff --git a/model-datastructure/src/commonMain/kotlin/org/modelix/model/IKeyValueStore.kt b/model-datastructure/src/commonMain/kotlin/org/modelix/model/IKeyValueStore.kt index 65ba45857a..ef07f5c896 100644 --- a/model-datastructure/src/commonMain/kotlin/org/modelix/model/IKeyValueStore.kt +++ b/model-datastructure/src/commonMain/kotlin/org/modelix/model/IKeyValueStore.kt @@ -16,12 +16,14 @@ package org.modelix.model import org.modelix.model.lazy.BulkQuery +import org.modelix.model.lazy.BulkQueryConfiguration import org.modelix.model.lazy.IBulkQuery import org.modelix.model.lazy.IDeserializingKeyValueStore interface IKeyValueStore { - fun newBulkQuery(deserializingCache: IDeserializingKeyValueStore): IBulkQuery = BulkQuery(deserializingCache) + fun newBulkQuery(deserializingCache: IDeserializingKeyValueStore, config: BulkQueryConfiguration): IBulkQuery = BulkQuery(deserializingCache, config) operator fun get(key: String): String? + fun getIfCached(key: String): String? suspend fun getA(key: String): String? = get(key) fun put(key: String, value: String?) fun getAll(keys: Iterable): Map diff --git a/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/BulkQuery.kt b/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/BulkQuery.kt index a6134beb16..de761bf10b 100644 --- a/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/BulkQuery.kt +++ b/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/BulkQuery.kt @@ -16,37 +16,64 @@ package org.modelix.model.lazy import org.modelix.model.persistent.IKVValue -import kotlin.jvm.Synchronized /** * Not thread safe */ -class BulkQuery(private val store: IDeserializingKeyValueStore) : IBulkQuery { - companion object { - val BATCH_SIZE = 5000 +class BulkQuery(private val store: IDeserializingKeyValueStore, config: BulkQueryConfiguration) : IBulkQuery { + private val queue: MutableMap> = LinkedHashMap() + private var processing = false + private val batchSize: Int = config.requestBatchSize + private val prefetchSize: Int = config.prefetchBatchSize ?: (this.batchSize / 2) + private val prefetchQueueSizeLimit: Int = (this.prefetchSize * 10).coerceAtLeast(this.batchSize * 2) + private val prefetchQueue: PrefetchQueue = PrefetchQueue(this, prefetchQueueSizeLimit).also { + it.requestFilter = { !queue.contains(it.getHash()) } } - private var queue: MutableList, (IKVValue?) -> Unit>> = ArrayList() - private var processing = false - protected fun executeBulkQuery(refs: Iterable>): Map { - val refsMap = refs.associateBy { it.getHash() } - val result = HashMap() - result += refs.filter { !it.isWritten() }.map { it.getHash() to it.getValue(store) } - val keysToQuery = refs.filter { it.isWritten() }.map { it.getHash() } - val queriedValues = store.getAll(keysToQuery) { key, serialized -> refsMap[key]!!.getDeserializer()(serialized) } - result += keysToQuery.zip(queriedValues) - return result + init { + require(this.prefetchSize <= this.batchSize) { "prefetch size ${this.prefetchSize} is greater than the batch size ${this.batchSize}" } + } + + private fun getValueInstance(ref: KVEntryReference): Value? { + return queue[ref.getHash()]?.let { it.value as Value? } + ?: (prefetchQueue.getValueInstance(ref) as Value?) } - fun query(key: KVEntryReference, callback: (T) -> Unit) { - if (queue.size >= BATCH_SIZE && !processing) executeQuery() - queue.add(Pair(key as KVEntryReference, callback as (IKVValue?) -> Unit)) + protected fun executeBulkQuery(regular: List>, prefetch: List>): Map { + return store.getAll(regular, prefetch) } override fun query(hash: KVEntryReference): IBulkQuery.Value { - val result = Value() - query(hash) { value: T? -> result.success(value) } - return result + if (!hash.isWritten()) return constant(hash.getValue(store)) + + val cachedValue = store.getIfCached(hash.getHash(), hash.getDeserializer(), prefetchQueue.isLoadingGoal()) + if (cachedValue != null) { + return constant(cachedValue) + } + + val existingValue = getValueInstance(hash) + if (existingValue != null && existingValue.isDone()) return existingValue + + if (prefetchQueue.isLoadingGoal()) { + prefetchQueue.addRequest(hash, getValueInstance(hash) as Value? ?: Value()) + return DummyValue() // transitive objects are loaded when the prefetch queue is processed the next time + } else { + if (queue.size >= batchSize && !processing) executeQuery() + + val existingQueueElement = queue[hash.getHash()] as QueueElement? + val result = if (existingQueueElement != null) { + existingQueueElement.value + } else { + val result: Value = getValueInstance(hash) as Value? ?: Value() + queue.put(hash.getHash(), QueueElement(hash, result)) + result + } + return result + } + } + + override fun offerPrefetch(goal: IPrefetchGoal) { + prefetchQueue.addGoal(goal) } override fun constant(value: T): IBulkQuery.Value { @@ -60,22 +87,26 @@ class BulkQuery(private val store: IDeserializingKeyValueStore) : IBulkQuery { processing = true try { while (queue.isNotEmpty()) { - val currentRequests: List, (IKVValue?) -> Unit>> - if (queue.size > BATCH_SIZE) { - // The callback of a request usually enqueues new request until it reaches the leafs of the - // data structure. By executing the latest (instead of the oldest) request we basically do a depth - // first traversal which keeps the maximum size of the queue smaller. - currentRequests = ArrayList(queue.subList(queue.size - BATCH_SIZE, queue.size)) - for (i in 1..BATCH_SIZE) queue.removeLast() - } else { - currentRequests = queue - queue = ArrayList() + // The callback of a request usually enqueues new requests until it reaches the leafs of the + // data structure. By executing the latest (instead of the oldest) request we basically do a depth + // first traversal which keeps the maximum size of the queue smaller. + val regularRequests: List, Value<*>>> = queue.entries.tailSequence(batchSize) + .map { it.value.hash to it.value.value } + .toList() + if (queue.size < prefetchSize) { + prefetchQueue.fillRequestsQueue(prefetchSize - regularRequests.size) } + val prefetchRequests: List, Value<*>>> = prefetchQueue.getRequests(prefetchSize - regularRequests.size) + regularRequests.forEach { queue.remove(it.first.getHash()) } + + val allRequests: List, Value<*>>> = regularRequests + prefetchRequests + val entries: Map = executeBulkQuery( - currentRequests.map { obj -> obj.first }.distinct(), + regularRequests.asSequence().map { obj -> obj.first }.toSet().toList(), + prefetchRequests.asSequence().map { obj -> obj.first }.toSet().toList(), ) - for (request in currentRequests) { - request.second(entries[request.first.getHash()]) + for (request in allRequests) { + (request.second as Value).success(entries[request.first.getHash()]) } } } finally { @@ -108,8 +139,13 @@ class BulkQuery(private val store: IDeserializingKeyValueStore) : IBulkQuery { return result } + private class QueueElement( + val hash: KVEntryReference, + val value: Value, + ) + inner class Value : IBulkQuery.Value { - private var handlers: MutableList<(T) -> Unit>? = ArrayList() + private var handlers: MutableList<(T) -> Unit>? = null private var value: T? = null private var done = false @@ -119,23 +155,25 @@ class BulkQuery(private val store: IDeserializingKeyValueStore) : IBulkQuery { done = true } - @Synchronized + fun isDone() = done + fun success(value: T) { check(!done) { "Value is already set" } this.value = value done = true - for (handler in handlers!!) { - handler(value) - } + handlers?.forEach { it(value) } handlers = null } - @Synchronized override fun onReceive(handler: (T) -> Unit) { if (done) { handler(value as T) } else { + if (handlers == null) handlers = ArrayList(1) handlers!!.add(handler) + check(handlers.let { it == null || it.size < 1_000 }) { + "Too many handlers" + } } } @@ -147,16 +185,128 @@ class BulkQuery(private val store: IDeserializingKeyValueStore) : IBulkQuery { return value!! } - override fun map(handler: (T) -> R): IBulkQuery.Value { + override fun map(transformation: (T) -> R): IBulkQuery.Value { val result = Value() - onReceive { v -> result.success(handler(v)) } + onReceive { v -> result.success(transformation(v)) } return result } - override fun flatMap(handler: (T) -> IBulkQuery.Value): IBulkQuery.Value { + override fun flatMap(transformation: (T) -> IBulkQuery.Value): IBulkQuery.Value { val result = Value() - onReceive { v -> handler(v).onReceive { value -> result.success(value) } } + onReceive { v -> transformation(v).onReceive { value -> result.success(value) } } return result } } + + class DummyValue : IBulkQuery.Value { + override fun executeQuery(): E { + throw UnsupportedOperationException() + } + + override fun flatMap(handler: (E) -> IBulkQuery.Value): IBulkQuery.Value = DummyValue() + + override fun map(handler: (E) -> R): IBulkQuery.Value = DummyValue() + + override fun onReceive(handler: (E) -> Unit) {} + } +} + +private fun Sequence.tailSequence(size: Int, tailSize: Int): Sequence { + if (size <= tailSize) return this + if (tailSize <= 0) return emptySequence() + return drop(size - tailSize) +} + +private fun Collection.tailSequence(tailSize: Int): Sequence { + return asSequence().tailSequence(size, tailSize) +} + +interface IPrefetchGoal { + fun loadRequest(bulkQuery: IBulkQuery) +} + +private class PrefetchQueue(val bulkQuery: IBulkQuery, val queueSizeLimit: Int) { + private val goals: MutableMap = LinkedHashMap() + private var previousRequests: MutableMap> = LinkedHashMap() + private var nextRequests: MutableMap> = LinkedHashMap() + private var currentGoal: QueuedGoal? = null + private var anyEntryRequested = false + var requestFilter: (KVEntryReference<*>) -> Boolean = { true } + + fun isLoadingGoal() = currentGoal != null + + fun fillRequestsQueue(requestLimit: Int) { + if (requestLimit <= 0) return + + previousRequests = nextRequests + nextRequests = LinkedHashMap() + + for (goal in goals.values.toList().sortedByDescending { it.prefetchLevel }.asReversed()) { + if (nextRequests.size >= requestLimit) break + executeRequests(goal) + } + } + + fun getRequests(limit: Int): List, BulkQuery.Value<*>>> { + return nextRequests.entries.tailSequence(limit) + .map { it.value.hash to it.value.result } + .toList() + } + + fun addGoal(goal: IPrefetchGoal) { + val newLevel = currentGoal?.prefetchLevel?.let { it + 1 } ?: 0 + // remove and re-add to move it to the end of the queue + val queuedGoal = goals.remove(goal)?.also { it.prefetchLevel = minOf(it.prefetchLevel, newLevel) } ?: QueuedGoal(goal, newLevel) + goals[goal] = queuedGoal + trimQueue() + } + + fun addRequest(hash: KVEntryReference, result: BulkQuery.Value) { + addRequest(hash, checkNotNull(currentGoal) { "Not loading any goal" }, result) + } + + private fun addRequest(hash: KVEntryReference, goal: QueuedGoal, result: BulkQuery.Value) { + anyEntryRequested = true + + val request = (previousRequests[hash.getHash()] ?: nextRequests[hash.getHash()])?.also { + require(result == it.result) + it.prefetchLevel = minOf(it.prefetchLevel, goal.prefetchLevel) + } ?: PrefetchRequest(hash, result, goal.prefetchLevel) + + if (!request.result.isDone() && requestFilter(request.hash)) { + nextRequests[hash.getHash()] = request + } + trimQueue() + } + + fun getValueInstance(hash: KVEntryReference): BulkQuery.Value? { + return ((nextRequests[hash.getHash()] ?: previousRequests[hash.getHash()]) as PrefetchRequest?)?.result + } + + private fun trimQueue() { + if (goals.size > queueSizeLimit * 2) { + val toRemove = goals.entries.sortedBy { it.value.prefetchLevel }.drop(goals.size - queueSizeLimit).map { it.key } + toRemove.forEach { goals.remove(it) } + } + } + + private fun executeRequests(goal: QueuedGoal) { + val previousGoal = currentGoal + val previousAnyEntryRequested = anyEntryRequested + try { + currentGoal = goal + anyEntryRequested = false + goal.goal.loadRequest(bulkQuery) + if (!anyEntryRequested) { + goals.remove(goal.goal) + } + } finally { + anyEntryRequested = previousAnyEntryRequested + currentGoal = previousGoal + } + } + + private inner class QueuedGoal(val goal: IPrefetchGoal, var prefetchLevel: Int) + + private inner class PrefetchRequest(val hash: KVEntryReference, val result: BulkQuery.Value, var prefetchLevel: Int) } diff --git a/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/CLTree.kt b/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/CLTree.kt index 902d7ac27f..3465c99bf9 100644 --- a/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/CLTree.kt +++ b/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/CLTree.kt @@ -30,7 +30,6 @@ import org.modelix.model.api.PNodeReference import org.modelix.model.api.tryResolve import org.modelix.model.lazy.COWArrays.insert import org.modelix.model.lazy.COWArrays.remove -import org.modelix.model.lazy.RepositoryId.Companion.random import org.modelix.model.persistent.CPHamtInternal import org.modelix.model.persistent.CPHamtNode import org.modelix.model.persistent.CPNode @@ -54,7 +53,7 @@ class CLTree : ITree, IBulkTree { var repositoryId = repositoryId_ if (data == null) { if (repositoryId == null) { - repositoryId = random() + repositoryId = RepositoryId.random() } val root = CPNode.create( 1, @@ -77,7 +76,7 @@ class CLTree : ITree, IBulkTree { private constructor(treeId_: String, idToHash: CPHamtNode, store: IDeserializingKeyValueStore, usesRoleIds: Boolean) { var treeId: String? = treeId_ if (treeId == null) { - treeId = random().id + treeId = RepositoryId.random().id } data = CPTree(treeId, KVEntryReference(idToHash), usesRoleIds) this.store = store @@ -95,6 +94,7 @@ class CLTree : ITree, IBulkTree { return (nodesMap ?: return 0L).calculateSize(store.newBulkQuery()).executeQuery() } + @Deprecated("BulkQuery is now responsible for prefetching") fun prefetchAll() { store.prefetch(hash) } @@ -315,8 +315,23 @@ class CLTree : ITree, IBulkTree { } override fun getAllChildren(parentId: Long): Iterable { - val children = getChildren(resolveElement(parentId)!!, store.newBulkQuery()).executeQuery() - return children.map { it.id } + return getAllChildren(parentId, store.newBulkQuery()).executeQuery() + } + + fun getAllChildren(parentId: Long, bulkQuery: IBulkQuery): IBulkQuery.Value> { + return resolveElement(parentId, bulkQuery).map { + it?.childrenIdArray?.asIterable() ?: emptyList() + } + } + + private data class PrefetchNodeGoal(val tree: CLTree, val nodeId: Long) : IPrefetchGoal { + override fun loadRequest(bulkQuery: IBulkQuery) { + tree.getAllChildren(nodeId, bulkQuery).map { it.forEach { tree.resolveElement(it, bulkQuery) } } + } + + override fun toString(): String { + return nodeId.toString(16) + } } override fun getDescendants(root: Long, includeSelf: Boolean): Iterable { @@ -443,7 +458,7 @@ class CLTree : ITree, IBulkTree { override fun visitChanges(oldVersion: ITree, visitor: ITreeChangeVisitor) { val bulkQuery = store.newBulkQuery() visitChanges(oldVersion, visitor, bulkQuery) - (bulkQuery as? BulkQuery)?.executeQuery() + bulkQuery.executeQuery() } fun visitChanges(oldVersion: ITree, visitor: ITreeChangeVisitor, bulkQuery: IBulkQuery) { @@ -594,7 +609,23 @@ class CLTree : ITree, IBulkTree { return if (hash == null) { query.constant(null) } else { - query.query(hash) + query.query(hash).also { + it.onReceive { node -> + if (node == null) return@onReceive + val children: LongArray = node.childrenIdArray + if (children.isNotEmpty()) { + children.reversedArray().forEach { + query.offerPrefetch(PrefetchNodeGoal(this, it)) + } + } + if (node.parentId != 0L) { + query.offerPrefetch(PrefetchNodeGoal(this, node.parentId)) + } + node.referenceTargets.asSequence().filter { it.isLocal }.forEach { target -> + query.offerPrefetch(PrefetchNodeGoal(this, target.elementId)) + } + } + } } } @@ -626,7 +657,7 @@ class CLTree : ITree, IBulkTree { } } - private fun getChildren(node: CPNode, bulkQuery: IBulkQuery): IBulkQuery.Value> { + private fun getChildren(node: CPNode, bulkQuery: IBulkQuery): IBulkQuery.Value> { return resolveElements(node.getChildrenIds().toList(), bulkQuery).map { elements -> elements } } diff --git a/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/CLVersion.kt b/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/CLVersion.kt index f71a5b17c5..483ec26759 100644 --- a/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/CLVersion.kt +++ b/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/CLVersion.kt @@ -369,7 +369,7 @@ private fun computeDelta(keyValueStore: IKeyValueStore, versionHash: String, bas ) v1 = v2 } - (bulkQuery as? BulkQuery)?.executeQuery() + bulkQuery.executeQuery() } val oldEntries: Map = trackAccessedEntries(keyValueStore) { store -> if (baseVersionHash == null) return@trackAccessedEntries @@ -391,7 +391,7 @@ private fun computeDelta(keyValueStore: IKeyValueStore, versionHash: String, bas } } - (bulkQuery as? BulkQuery)?.executeQuery() + bulkQuery.executeQuery() } return oldAndNewEntries - oldEntries.keys } @@ -406,8 +406,8 @@ private fun trackAccessedEntries(store: IKeyValueStore, body: (IDeserializingKey private class AccessTrackingStore(val store: IKeyValueStore) : IKeyValueStore { val accessedEntries: MutableMap = LinkedHashMap() - override fun newBulkQuery(deserializingCache: IDeserializingKeyValueStore): IBulkQuery { - return store.newBulkQuery(deserializingCache) + override fun newBulkQuery(deserializingCache: IDeserializingKeyValueStore, config: BulkQueryConfiguration): IBulkQuery { + return store.newBulkQuery(deserializingCache, config) } override fun get(key: String): String? { @@ -416,6 +416,14 @@ private class AccessTrackingStore(val store: IKeyValueStore) : IKeyValueStore { return value } + override fun getIfCached(key: String): String? { + val value = store.getIfCached(key) + if (value != null) { + accessedEntries[key] = value + } + return value + } + override fun put(key: String, value: String?) { TODO("Not yet implemented") } diff --git a/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/IBulkQuery.kt b/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/IBulkQuery.kt index ce0e0e37a9..3a64bc0560 100644 --- a/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/IBulkQuery.kt +++ b/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/IBulkQuery.kt @@ -18,6 +18,7 @@ package org.modelix.model.lazy import org.modelix.model.persistent.IKVValue interface IBulkQuery { + fun offerPrefetch(key: IPrefetchGoal) fun executeQuery() fun flatMap(input: Iterable, f: (I) -> Value): Value> fun constant(value: T): Value @@ -29,3 +30,21 @@ interface IBulkQuery { fun onReceive(handler: (T) -> Unit) } } + +open class BulkQueryConfiguration { + /** + * The maximum number of objects that is requested in one request. + */ + var requestBatchSize: Int = defaultRequestBatchSize + + /** + * If a request contains fewer objects than [prefetchBatchSize], it is filled up with additional objects that are + * predicted to be required in the future. + */ + var prefetchBatchSize: Int? = defaultPrefetchBatchSize + + companion object { + var defaultRequestBatchSize: Int = 5_000 + var defaultPrefetchBatchSize: Int? = null + } +} diff --git a/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/IDeserializingKeyValueStore.kt b/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/IDeserializingKeyValueStore.kt index 2a85672623..3391d1c5d5 100644 --- a/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/IDeserializingKeyValueStore.kt +++ b/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/IDeserializingKeyValueStore.kt @@ -16,13 +16,18 @@ package org.modelix.model.lazy import org.modelix.model.IKeyValueStore +import org.modelix.model.persistent.IKVValue interface IDeserializingKeyValueStore { fun newBulkQuery(): IBulkQuery = newBulkQuery(this) - fun newBulkQuery(wrapper: IDeserializingKeyValueStore): IBulkQuery = keyValueStore.newBulkQuery(wrapper) + fun newBulkQuery(wrapper: IDeserializingKeyValueStore, config: BulkQueryConfiguration? = null): IBulkQuery = keyValueStore.newBulkQuery(wrapper, config ?: BulkQueryConfiguration()) val keyValueStore: IKeyValueStore operator fun get(hash: String, deserializer: (String) -> T): T? + fun getIfCached(hash: String, deserializer: (String) -> T, isPrefetch: Boolean): T? fun getAll(hash: Iterable, deserializer: (String, String) -> T): Iterable + fun getAll(regular: List>, prefetch: List>): Map = throw UnsupportedOperationException() fun put(hash: String, deserialized: Any, serialized: String) + + @Deprecated("BulkQuery is now responsible for prefetching") fun prefetch(hash: String) } diff --git a/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/IndirectObjectStore.kt b/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/IndirectObjectStore.kt index 24d15d4cd1..65b44484df 100644 --- a/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/IndirectObjectStore.kt +++ b/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/IndirectObjectStore.kt @@ -26,6 +26,10 @@ abstract class IndirectObjectStore : IDeserializingKeyValueStore { return getStore().get(hash, deserializer) } + override fun getIfCached(hash: String, deserializer: (String) -> T, isPrefetch: Boolean): T? { + return getStore().getIfCached(hash, deserializer, isPrefetch) + } + override fun getAll(hash: Iterable, deserializer: (String, String) -> T): Iterable { return getStore().getAll(hash, deserializer) } diff --git a/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/NonBulkQuery.kt b/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/NonBulkQuery.kt index 0d70318f9f..39e1c27ba9 100644 --- a/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/NonBulkQuery.kt +++ b/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/NonBulkQuery.kt @@ -27,6 +27,10 @@ class NonBulkQuery(private val store: IDeserializingKeyValueStore) : IBulkQuery return Value(value) } + override fun offerPrefetch(key: IPrefetchGoal) { + // Since no real bulk queries are executed, prefetching doesn't provide any benefit. + } + override fun query(hash: KVEntryReference): IBulkQuery.Value { return constant(hash.getValue(store)) } diff --git a/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/NonCachingObjectStore.kt b/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/NonCachingObjectStore.kt index ef08a4f6fd..def08aed1f 100644 --- a/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/NonCachingObjectStore.kt +++ b/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/NonCachingObjectStore.kt @@ -17,6 +17,7 @@ package org.modelix.model.lazy import org.modelix.model.IKeyValueStore +import org.modelix.model.persistent.IKVValue class NonCachingObjectStore(override val keyValueStore: IKeyValueStore) : IDeserializingKeyValueStore { @@ -29,10 +30,28 @@ class NonCachingObjectStore(override val keyValueStore: IKeyValueStore) : IDeser } } + override fun getAll( + regular: List>, + prefetch: List>, + ): Map { + val allRequests = regular.asSequence() + prefetch.asSequence() + val hashes = allRequests.map { it.getHash() } + val deserializers = allRequests.associate { it.getHash() to it.getDeserializer() } + val serialized: Map = keyValueStore.getAll(hashes.asIterable()) + return serialized.mapValues { (hash, serializedValue) -> + val value = checkNotNull(serializedValue) { "Entry not found: $hash" } + deserializers[hash]!!(value) + } + } + override fun get(hash: String, deserializer: (String) -> T): T? { return keyValueStore.get(hash)?.let(deserializer) } + override fun getIfCached(hash: String, deserializer: (String) -> T, isPrefetch: Boolean): T? { + return keyValueStore.getIfCached(hash)?.let(deserializer) + } + override fun put(hash: String, deserialized: Any, serialized: String) { keyValueStore.put(hash, serialized) } diff --git a/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/ObjectStoreCache.kt b/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/ObjectStoreCache.kt index 389eaf4988..e9b77090ef 100644 --- a/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/ObjectStoreCache.kt +++ b/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/ObjectStoreCache.kt @@ -16,18 +16,53 @@ package org.modelix.model.lazy import org.modelix.model.IKeyValueStore -import org.modelix.model.createLRUMap +import org.modelix.model.persistent.IKVValue import kotlin.jvm.JvmOverloads +import kotlin.jvm.Synchronized -class ObjectStoreCache @JvmOverloads constructor(override val keyValueStore: IKeyValueStore, cacheSize: Int = 100_000) : IDeserializingKeyValueStore { - private val cache: MutableMap = createLRUMap(cacheSize) +class CacheConfiguration : BulkQueryConfiguration() { + /** + * Size of the cache for regularly requested objects. + */ + var cacheSize: Int = defaultCacheSize + /** + * Size of the separate cache for prefetched objects. + * Objects are prefetched based on a prediction of what data might be needed next, but they may not be actually + * used at all. To avoid eviction of regular objects, there are two separate caches. + */ + var prefetchCacheSize: Int? = defaultPrefetchCacheSize + fun getPrefetchCacheSize() = prefetchCacheSize ?: cacheSize + + companion object { + var defaultCacheSize: Int = 100_000 + var defaultPrefetchCacheSize: Int? = null + } +} + +class ObjectStoreCache @JvmOverloads constructor( + override val keyValueStore: IKeyValueStore, + val config: CacheConfiguration = CacheConfiguration(), +) : IDeserializingKeyValueStore { + private val regularCache = LRUCache(config.cacheSize) + private val prefetchCache = LRUCache(config.getPrefetchCacheSize()) + private var bulkQuery: Pair? = null + + @Synchronized + override fun newBulkQuery(wrapper: IDeserializingKeyValueStore, config: BulkQueryConfiguration?): IBulkQuery { + if (bulkQuery?.takeIf { it.second == wrapper } == null) { + bulkQuery = keyValueStore.newBulkQuery(wrapper, config ?: this.config).asSynchronized() to wrapper + } + return bulkQuery!!.first + } + + @Synchronized override fun getAll(hashes_: Iterable, deserializer: (String, String) -> T): Iterable { val hashes = hashes_.toList() - val result: MutableMap = HashMap() + val result: MutableMap = LinkedHashMap() val nonCachedHashes: MutableList = ArrayList(hashes.size) for (hash in hashes) { - val deserialized = cache[hash] as T? + val deserialized = (regularCache[hash] ?: prefetchCache[hash]) as T? if (deserialized == null) { nonCachedHashes.add(hash) } else { @@ -40,7 +75,7 @@ class ObjectStoreCache @JvmOverloads constructor(override val keyValueStore: IKe result[hash] = null } else { val deserialized: T? = deserializer(hash, serialized) - cache[hash] = deserialized ?: NULL + regularCache[hash] = deserialized ?: NULL result[hash] = deserialized } } @@ -48,23 +83,70 @@ class ObjectStoreCache @JvmOverloads constructor(override val keyValueStore: IKe return hashes.map { key: String? -> result[key] as T }.asIterable() } + @Synchronized + override fun getAll( + regular: List>, + prefetch: List>, + ): Map { + val regularHashes = regular.asSequence().map { it.getHash() }.toSet() + val allRequests = regular.asSequence().plus(prefetch.asSequence()) + val deserializers = allRequests.associate { it.getHash() to it.getDeserializer() } + val hashes = allRequests.map { it.getHash() }.toList() + val result: MutableMap = LinkedHashMap() + val nonCachedHashes: MutableList = ArrayList(hashes.size) + for (hash in hashes) { + val deserialized = regularCache.get(hash, updatePosition = regularHashes.contains(hash)) ?: prefetchCache.get(hash) + if (deserialized == null) { + nonCachedHashes.add(hash) + } else { + result[hash] = if (deserialized === NULL) null else deserialized as T? + } + } + if (nonCachedHashes.isNotEmpty()) { + for ((hash, serialized) in keyValueStore.getAll(nonCachedHashes)) { + if (serialized == null) { + result[hash] = null + } else { + val deserialized = deserializers[hash]!!(serialized) + (if (regularHashes.contains(hash)) regularCache else prefetchCache)[hash] = deserialized ?: NULL + result[hash] = deserialized + } + } + } + return result + } + + @Synchronized override fun get(hash: String, deserializer: (String) -> T): T? { - var deserialized = cache[hash] as T? + return get(hash, deserializer, false, false) + } + + private fun get(hash: String, deserializer: (String) -> T, ifCached: Boolean, isPrefetch: Boolean): T? { + var deserialized = (regularCache.get(hash, updatePosition = !isPrefetch) ?: prefetchCache.get(hash)) as T? if (deserialized == null) { - val serialized = keyValueStore[hash] ?: return null + val serialized = (if (ifCached) keyValueStore.getIfCached(hash) else keyValueStore[hash]) ?: return null deserialized = deserializer(serialized) - cache[hash] = deserialized ?: NULL + (if (isPrefetch) prefetchCache else regularCache)[hash] = deserialized ?: NULL } return if (deserialized === NULL) null else deserialized } + @Synchronized + override fun getIfCached(hash: String, deserializer: (String) -> T, isPrefetch: Boolean): T? { + return get(hash, deserializer, true, isPrefetch) + } + + @Synchronized override fun put(hash: String, deserialized: Any, serialized: String) { keyValueStore.put(hash, serialized) - cache[hash] = deserialized ?: NULL + regularCache[hash] = deserialized ?: NULL + prefetchCache.remove(hash) } + @Synchronized fun clearCache() { - cache.clear() + regularCache.clear() + prefetchCache.clear() } override fun prefetch(hash: String) { @@ -75,3 +157,30 @@ class ObjectStoreCache @JvmOverloads constructor(override val keyValueStore: IKe private val NULL = Any() } } + +private class LRUCache(val maxSize: Int) { + private val map: MutableMap = LinkedHashMap() + + operator fun set(key: K, value: V) { + map.remove(key) + map[key] = value + while (map.size > maxSize) map.remove(map.iterator().next().key) + } + + operator fun get(key: K, updatePosition: Boolean = true): V? { + return map[key]?.also { value -> + if (updatePosition) { + map.remove(key) + map[key] = value as V + } + } + } + + fun remove(key: K) { + map.remove(key) + } + + fun clear() { + map.clear() + } +} diff --git a/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/PrefetchCache.kt b/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/PrefetchCache.kt index 32bb01fea3..27a79b3119 100644 --- a/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/PrefetchCache.kt +++ b/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/PrefetchCache.kt @@ -22,6 +22,7 @@ import org.modelix.model.api.ITree * This guarantees that after a prefetch there are no more request required. * Not thread safe. */ +@Deprecated("BulkQuery is now responsible for prefetching") class PrefetchCache(private val store: IDeserializingKeyValueStore) : IDeserializingKeyValueStore { init { if (store is ContextIndirectCache) throw IllegalArgumentException() @@ -33,15 +34,23 @@ class PrefetchCache(private val store: IDeserializingKeyValueStore) : IDeseriali override val keyValueStore: IKeyValueStore = store.keyValueStore override fun get(hash: String, deserializer: (String) -> T): T? { + return get(hash, deserializer, false, false) + } + + private fun get(hash: String, deserializer: (String) -> T, ifCached: Boolean, isPrefetch: Boolean): T? { return if (entries.containsKey(hash)) { entries[hash] as T? } else { - val value = store.get(hash, deserializer) + val value = if (ifCached) store.getIfCached(hash, deserializer, isPrefetch) else store.get(hash, deserializer) entries[hash] = value value } } + override fun getIfCached(hash: String, deserializer: (String) -> T, isPrefetch: Boolean): T? { + return get(hash, deserializer, true, isPrefetch) + } + override fun getAll(hashes: Iterable, deserializer: (String, String) -> T): Iterable { val missingHashes = hashes.filterNot { entries.containsKey(it) } val missingValues = store.getAll(missingHashes, deserializer).toList() diff --git a/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/SynchronizedBulkQuery.kt b/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/SynchronizedBulkQuery.kt new file mode 100644 index 0000000000..f8a301920c --- /dev/null +++ b/model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/SynchronizedBulkQuery.kt @@ -0,0 +1,78 @@ +/* + * Copyright (c) 2024. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.modelix.model.lazy + +import org.modelix.model.api.runSynchronized +import org.modelix.model.persistent.IKVValue +import kotlin.jvm.Synchronized + +class SynchronizedBulkQuery(val nonThreadSafeQuery: IBulkQuery) : IBulkQuery { + @Synchronized + override fun constant(value: T): IBulkQuery.Value { + return nonThreadSafeQuery.constant(value) + } + + @Synchronized + override fun offerPrefetch(key: IPrefetchGoal) { + return nonThreadSafeQuery.offerPrefetch(key) + } + + @Synchronized + override fun executeQuery() { + return nonThreadSafeQuery.executeQuery() + } + + @Synchronized + override fun flatMap(input: Iterable, f: (I) -> IBulkQuery.Value): IBulkQuery.Value> { + return nonThreadSafeQuery.flatMap(input, f) + } + + @Synchronized + override fun query(hash: KVEntryReference): IBulkQuery.Value { + return nonThreadSafeQuery.query(hash) + } + + inner class Value(val nonThreadSafeValue: IBulkQuery.Value) : IBulkQuery.Value { + override fun executeQuery(): E { + runSynchronized(this@SynchronizedBulkQuery) { + return nonThreadSafeValue.executeQuery() + } + } + + override fun flatMap(handler: (E) -> IBulkQuery.Value): IBulkQuery.Value { + runSynchronized(this@SynchronizedBulkQuery) { + return nonThreadSafeValue.flatMap(handler) + } + } + + override fun map(handler: (E) -> R): IBulkQuery.Value { + runSynchronized(this@SynchronizedBulkQuery) { + return nonThreadSafeValue.map(handler) + } + } + + override fun onReceive(handler: (E) -> Unit) { + runSynchronized(this@SynchronizedBulkQuery) { + return nonThreadSafeValue.onReceive(handler) + } + } + } +} + +fun IBulkQuery.asSynchronized(): IBulkQuery { + return if (this is SynchronizedBulkQuery) this else SynchronizedBulkQuery(this) +} diff --git a/model-datastructure/src/commonMain/kotlin/org/modelix/model/persistent/CPHamtSingle.kt b/model-datastructure/src/commonMain/kotlin/org/modelix/model/persistent/CPHamtSingle.kt index 66dec7fdd8..4b03245535 100644 --- a/model-datastructure/src/commonMain/kotlin/org/modelix/model/persistent/CPHamtSingle.kt +++ b/model-datastructure/src/commonMain/kotlin/org/modelix/model/persistent/CPHamtSingle.kt @@ -109,7 +109,7 @@ class CPHamtSingle( } fun getChild(bulkQuery: IBulkQuery): IBulkQuery.Value { - return bulkQuery.query(child).map { childData -> childData!! } + return bulkQuery.query(child).map { childData -> checkNotNull(childData) { "Entry not found: $child" } } } override fun visitEntries(bulkQuery: IBulkQuery, visitor: (Long, KVEntryReference) -> Unit): IBulkQuery.Value { diff --git a/model-datastructure/src/commonMain/kotlin/org/modelix/model/persistent/MapBasedStore.kt b/model-datastructure/src/commonMain/kotlin/org/modelix/model/persistent/MapBasedStore.kt index 25dd850a98..5d9d3adbbb 100644 --- a/model-datastructure/src/commonMain/kotlin/org/modelix/model/persistent/MapBasedStore.kt +++ b/model-datastructure/src/commonMain/kotlin/org/modelix/model/persistent/MapBasedStore.kt @@ -19,6 +19,7 @@ import org.modelix.kotlin.utils.createMemoryEfficientMap import org.modelix.kotlin.utils.toSynchronizedMap import org.modelix.model.IKeyListener import org.modelix.model.IKeyValueStore +import org.modelix.model.lazy.BulkQueryConfiguration import org.modelix.model.lazy.IBulkQuery import org.modelix.model.lazy.IDeserializingKeyValueStore import org.modelix.model.lazy.NonBulkQuery @@ -32,7 +33,11 @@ open class MapBasedStore : IKeyValueStore { return map[key] } - override fun newBulkQuery(deserializingCache: IDeserializingKeyValueStore): IBulkQuery { + override fun getIfCached(key: String): String? { + return get(key) + } + + override fun newBulkQuery(deserializingCache: IDeserializingKeyValueStore, config: BulkQueryConfiguration): IBulkQuery { // This implementation doesn't benefit from bulk queries. The NonBulkQuery has a lower performance overhead. return NonBulkQuery(deserializingCache) } diff --git a/model-server-api/src/commonMain/kotlin/org/modelix/model/server/api/v2/ImmutableObjectsStream.kt b/model-server-api/src/commonMain/kotlin/org/modelix/model/server/api/v2/ImmutableObjectsStream.kt new file mode 100644 index 0000000000..11107e0469 --- /dev/null +++ b/model-server-api/src/commonMain/kotlin/org/modelix/model/server/api/v2/ImmutableObjectsStream.kt @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2024. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.modelix.model.server.api.v2 + +import io.ktor.http.ContentType +import io.ktor.utils.io.ByteReadChannel +import io.ktor.utils.io.readUTF8Line + +object ImmutableObjectsStream { + val CONTENT_TYPE = ContentType("application", "x-modelix-immutable-objects") + + fun encode(out: Appendable, objects: Map) { + encode(out, objects.asSequence()) + } + + fun encode(out: Appendable, objects: Sequence>) { + objects.forEach { + out.append(it.key) + out.append("\n") + out.append(it.value) + out.append("\n") + } + // additional empty line indicates end of stream and can be used to verify completeness of data transfer + out.append("\n") + } + + suspend fun decode(input: ByteReadChannel): Map { + val objects = LinkedHashMap() + while (true) { + val key = checkNotNull(input.readUTF8Line()) { "Empty line expected at the end of the stream" } + if (key == "") { + check(input.readUTF8Line() == null) { "Empty line is only allowed at the end of the stream" } + break + } + val value = checkNotNull(input.readUTF8Line()) { "Object missing for hash $key" } + objects[key] = value + } + return objects + } +} + +typealias ObjectHash = String +typealias SerializedObject = String +typealias ObjectHashAndSerializedObject = Pair diff --git a/model-server/src/main/kotlin/org/modelix/model/server/handlers/ModelReplicationServer.kt b/model-server/src/main/kotlin/org/modelix/model/server/handlers/ModelReplicationServer.kt index 4b51822244..a656561822 100644 --- a/model-server/src/main/kotlin/org/modelix/model/server/handlers/ModelReplicationServer.kt +++ b/model-server/src/main/kotlin/org/modelix/model/server/handlers/ModelReplicationServer.kt @@ -54,6 +54,7 @@ import org.modelix.model.lazy.CLVersion import org.modelix.model.lazy.RepositoryId import org.modelix.model.operations.OTBranch import org.modelix.model.persistent.HashUtil +import org.modelix.model.server.api.v2.ImmutableObjectsStream import org.modelix.model.server.api.v2.VersionDelta import org.modelix.model.server.api.v2.VersionDeltaStream import org.modelix.model.server.api.v2.VersionDeltaStreamV2 @@ -202,19 +203,16 @@ class ModelReplicationServer( } override suspend fun PipelineContext.postRepositoryObjectsGetAll(repository: String) { - val keys = call.receiveStream().bufferedReader().use { reader -> - reader.lineSequence().toHashSet() - } - val objects = withContext(Dispatchers.IO) { modelClient.store.getAll(keys) } - call.respondTextWriter(contentType = VersionDeltaStream.CONTENT_TYPE) { - objects.forEach { - append(it.key) - append("\n") - append(it.value) - append("\n") + runWithRepository(repository) { + val keys = call.receiveStream().bufferedReader().use { reader -> + reader.lineSequence().toHashSet() + } + val objects = withContext(Dispatchers.IO) { modelClient.store.getAll(keys) }.checkValuesNotNull { + "Object not found: $it" + } + call.respondTextWriter(contentType = ImmutableObjectsStream.CONTENT_TYPE) { + ImmutableObjectsStream.encode(this, objects) } - // additional empty line indicates end of stream and can be used to verify completeness of data transfer - append("\n") } } @@ -433,3 +431,10 @@ private fun Flow.withSeparator(separator: String) = flow { emit(it) } } + +@Suppress("UNCHECKED_CAST") +private fun Map.checkValuesNotNull(lazyMessage: (K) -> Any): Map = apply { + for (entry in this) { + checkNotNull(entry.value) { lazyMessage(entry.key) } + } +} as Map diff --git a/model-server/src/main/kotlin/org/modelix/model/server/store/LocalModelClient.kt b/model-server/src/main/kotlin/org/modelix/model/server/store/LocalModelClient.kt index 7aa49cb3e0..e990031e4a 100644 --- a/model-server/src/main/kotlin/org/modelix/model/server/store/LocalModelClient.kt +++ b/model-server/src/main/kotlin/org/modelix/model/server/store/LocalModelClient.kt @@ -33,6 +33,10 @@ class LocalModelClient(val store: IStoreClient) : IModelClient { return store[key] } + override fun getIfCached(key: String): String? { + return null + } + override fun put(key: String, value: String?) { store.put(key, value) } diff --git a/model-server/src/test/kotlin/org/modelix/model/server/LazyLoadingTest.kt b/model-server/src/test/kotlin/org/modelix/model/server/LazyLoadingTest.kt index 70a515de2e..7dfc90da84 100644 --- a/model-server/src/test/kotlin/org/modelix/model/server/LazyLoadingTest.kt +++ b/model-server/src/test/kotlin/org/modelix/model/server/LazyLoadingTest.kt @@ -21,12 +21,20 @@ import io.ktor.server.testing.testApplication import org.modelix.authorization.installAuthentication import org.modelix.model.api.INode import org.modelix.model.api.NullChildLink +import org.modelix.model.api.PBranch import org.modelix.model.api.TreePointer import org.modelix.model.api.addNewChild import org.modelix.model.api.getDescendants import org.modelix.model.api.getRootNode +import org.modelix.model.client.IdGenerator +import org.modelix.model.client2.IModelClientV2 +import org.modelix.model.client2.IModelClientV2Internal import org.modelix.model.client2.lazyLoadVersion -import org.modelix.model.client2.runWrite +import org.modelix.model.lazy.BranchReference +import org.modelix.model.lazy.CLTree +import org.modelix.model.lazy.CLVersion +import org.modelix.model.lazy.CacheConfiguration +import org.modelix.model.lazy.ObjectStoreCache import org.modelix.model.lazy.RepositoryId import org.modelix.model.persistent.MapBasedStore import org.modelix.model.server.api.v2.ObjectHash @@ -34,81 +42,203 @@ import org.modelix.model.server.api.v2.SerializedObject import org.modelix.model.server.handlers.IdsApiImpl import org.modelix.model.server.handlers.ModelReplicationServer import org.modelix.model.server.store.InMemoryStoreClient +import org.modelix.model.server.store.forContextRepository +import kotlin.random.Random import kotlin.test.Test -import kotlin.test.assertTrue +import kotlin.test.assertEquals +@Suppress("ktlint:standard:annotation", "ktlint:standard:spacing-between-declarations-with-annotations") class LazyLoadingTest { - - private lateinit var statistics: StoreClientWithStatistics + private var totalRequests: Long = 0 + private var totalObjects: Long = 0 + private var maxRequestSize: Int = 0 private fun runTest(block: suspend ApplicationTestBuilder.() -> Unit) = testApplication { application { installAuthentication(unitTestMode = true) installDefaultServerPlugins() - statistics = StoreClientWithStatistics(InMemoryStoreClient()) - ModelReplicationServer(statistics).init(this) - IdsApiImpl(statistics).init(this) + val store = InMemoryStoreClient().forContextRepository() + ModelReplicationServer(store).init(this) + IdsApiImpl(store).init(this) } block() } - private fun assertRequestCount(atLeast: Long, body: () -> Unit): Long { - val requestCount = measureRequests(body) - assertTrue(requestCount >= atLeast, "At least $atLeast requests expected, but was $requestCount") - return requestCount - } - - private fun measureRequests(body: () -> Unit): Long { - val before = statistics.getTotalRequests() + private fun measureRequests(body: () -> Unit): Pair { + maxRequestSize = 0 + val before = totalRequests + val objectsBefore = totalObjects body() - val after = statistics.getTotalRequests() - val requestCount = after - before + val after = totalRequests + val objectsAfter = totalObjects + val requestCount = (after - before).toInt() + val requestedObjectsCount = (objectsAfter - objectsBefore).toInt() println("Requests: $requestCount") - return requestCount + println("Requested Objects: $requestedObjectsCount") + println("Max Request Size: $maxRequestSize") + return Pair(requestCount, requestedObjectsCount) } - @Test - fun `model data is loaded on demand`() = runTest { - // After optimizing the lazy loading to send less (but bigger) requests, this test might fail. - // Just update the model size, cache size and expected request count to fix it. + @Test fun compare_batch_size_10() = compare_batch_size(10, 22, 201, 187, 137, 1996, 1863) + @Test fun compare_batch_size_25() = compare_batch_size(25, 22, 103, 82, 285, 2575, 2050) + @Test fun compare_batch_size_50() = compare_batch_size(50, 22, 87, 72, 510, 4350, 3600) + @Test fun compare_batch_size_100() = compare_batch_size(100, 22, 60, 49, 905, 6000, 4900) + @Test fun compare_batch_size_200() = compare_batch_size(200, 22, 142, 160, 1605, 28400, 32000) + @Test fun compare_batch_size_400() = compare_batch_size(400, 22, 743, 138, 2627, 216698, 18369) + @Test fun compare_batch_size_800() = compare_batch_size(800, 22, 717, 185, 2700, 205909, 23806) + @Test fun compare_batch_size_1600() = compare_batch_size(1600, 22, 717, 185, 2700, 205909, 23806) + fun compare_batch_size(batchSize: Int, vararg expected: Int) = runLazyLoadingTest(DepthFirstSearchPattern, 1_000, 1_000, batchSize, batchSize, *expected) + + @Test fun compare_cache_size_100() = compare_cache_size(100, 22, 861, 891, 510, 43050, 44550) + @Test fun compare_cache_size_200() = compare_cache_size(200, 22, 539, 490, 510, 26950, 24500) + @Test fun compare_cache_size_400() = compare_cache_size(400, 22, 217, 139, 510, 10850, 6950) + @Test fun compare_cache_size_800() = compare_cache_size(800, 22, 57, 73, 510, 2850, 3650) + @Test fun compare_cache_size_1600() = compare_cache_size(1600, 22, 44, 41, 510, 2200, 2050) + @Test fun compare_cache_size_3200() = compare_cache_size(3200, 22, 34, 0, 510, 1561, 0) + private fun compare_cache_size(cacheSize: Int, vararg expected: Int) = runLazyLoadingTest(DepthFirstSearchPattern, 1_000, cacheSize, 50, 50, *expected) - val client = createModelClient() + @Test fun compare_prefetch_size_0() = compare_prefetch_size(0, 22, 2055, 2073, 22, 2055, 2073) + @Test fun compare_prefetch_size_2() = compare_prefetch_size(2, 22, 1043, 1046, 38, 2086, 2092) + @Test fun compare_prefetch_size_4() = compare_prefetch_size(3, 22, 693, 708, 53, 2079, 2124) + @Test fun compare_prefetch_size_10() = compare_prefetch_size(10, 22, 288, 333, 137, 2880, 3330) + @Test fun compare_prefetch_size_25() = compare_prefetch_size(25, 22, 339, 296, 285, 8475, 7400) + @Test fun compare_prefetch_size_50() = compare_prefetch_size(50, 22, 539, 490, 510, 26950, 24500) + private fun compare_prefetch_size(prefetchSize: Int, vararg expected: Int) = runLazyLoadingTest(DepthFirstSearchPattern, 1_000, 200, 50, prefetchSize, *expected) + + @Test fun compare_access_pattern_dfs() = compare_access_pattern(DepthFirstSearchPattern, 22, 123, 133, 510, 6150, 6650) + @Test fun compare_access_pattern_pdfs() = compare_access_pattern(ParallelDepthFirstSearchPattern, 22, 325, 174, 510, 16250, 8700) + @Test fun compare_access_pattern_bfs() = compare_access_pattern(BreathFirstSearchPattern, 22, 320, 278, 510, 16000, 13900) + @Test fun compare_access_pattern_random() = compare_access_pattern(RandomPattern(1_000, Random(987)), 22, 194, 154, 510, 5388, 3878) + private fun compare_access_pattern(pattern: AccessPattern, vararg expected: Int) = runLazyLoadingTest(pattern, 1_000, 500, 50, 50, *expected) + + private fun runLazyLoadingTest(accessPattern: AccessPattern, numberOfNodes: Int, cacheSize: Int, batchSize: Int, prefetchSize: Int, vararg expectedRequests: Int) { + runLazyLoadingTest(accessPattern, numberOfNodes, cacheSize, batchSize, prefetchSize, expectedRequests.toList()) + } + + private fun runLazyLoadingTest(accessPattern: AccessPattern, numberOfNodes: Int, cacheSize: Int, batchSize: Int, prefetchSize: Int, expectedRequests: List) = runTest { + val client = ModelClientWithStatistics(createModelClient()) val branchRef = RepositoryId("my-repo").getBranchReference() - client.runWrite(branchRef) { - fun createNodes(parentNode: INode, numberOfNodes: Int) { + createModel(client, branchRef, numberOfNodes) + + val version = client.lazyLoadVersion( + branchRef, + CacheConfiguration().also { + it.cacheSize = cacheSize + it.requestBatchSize = batchSize + it.prefetchBatchSize = prefetchSize + }, + ) + val rootNode = TreePointer(version.getTree()).getRootNode() + + val actualRequestCount = ArrayList() + + // Traverse to the first leaf node. This should load some data, but not the whole model. + actualRequestCount += measureRequests { + generateSequence(rootNode) { it.allChildren.firstOrNull() }.count() + }.toList() + + // Traverse the whole model. + actualRequestCount += measureRequests { + accessPattern.runPattern(rootNode) + }.toList() + + // Traverse the whole model a second time. The model doesn't fit into the cache and some parts are already + // unloaded during the first traversal. The unloaded parts need to be requested again. + // But the navigation to the first leaf is like a warmup of the cache for the whole model traversal. + // The previous traversal can benefit from that, but the next one cannot and is expected to need more requests. + actualRequestCount += measureRequests { + accessPattern.runPattern(rootNode) + }.toList() + + // move request count before object count + val reorderedActualRequests = actualRequestCount.withIndex().sortedBy { it.index % 2 }.map { it.value } + + assertEquals(expectedRequests, reorderedActualRequests) + } + + /** + * Creates a CLTree with fixed ID and a version without timestamp. + * This ensures that exactly the same data is created for each test run which avoids non-deterministic test results. + */ + private suspend fun createModel(client: IModelClientV2, branchRef: BranchReference, numberOfNodes: Int) { + val initialTree = CLTree.builder(ObjectStoreCache(MapBasedStore())).repositoryId(RepositoryId("xxx")).build() + val branch = PBranch(initialTree, IdGenerator.newInstance(100)) + val rootNode = branch.getRootNode() + branch.runWrite { + fun createNodes(parentNode: INode, numberOfNodes: Int, rand: Random) { if (numberOfNodes == 0) return if (numberOfNodes == 1) { parentNode.addNewChild(NullChildLink, 0) return } - val subtreeSize1 = numberOfNodes / 2 - val subtreeSize2 = numberOfNodes - subtreeSize1 - createNodes(parentNode.addNewChild(NullChildLink, 0), subtreeSize1 - 1) - createNodes(parentNode.addNewChild(NullChildLink, 1), subtreeSize2 - 1) + val numChildren = rand.nextInt(2, 10.coerceAtMost(numberOfNodes) + 1) + val subtreeSize = numberOfNodes / numChildren + val remainder = numberOfNodes % numChildren + for (i in 1..numChildren) { + createNodes(parentNode.addNewChild(NullChildLink, 0), subtreeSize - 1 + (if (i == 1) remainder else 0), rand) + } } - createNodes(it, 5_000) + createNodes(rootNode, numberOfNodes, Random(10001)) } - val version = client.lazyLoadVersion(branchRef, cacheSize = 500) - - val rootNode = TreePointer(version.getTree()).getRootNode() + val initialVersion = CLVersion.createRegularVersion( + id = 1000L, + time = null, + author = null, + tree = branch.computeReadT { it.tree } as CLTree, + baseVersion = null, + operations = emptyArray(), + ) + client.push(branchRef, initialVersion, null) + } - // Traverse to the first leaf node. This should load some data, but not the whole model. - assertRequestCount(1) { - generateSequence(rootNode) { it.allChildren.firstOrNull() }.count() + private inner class ModelClientWithStatistics(val client: IModelClientV2Internal) : IModelClientV2Internal by client { + override suspend fun getObjects(repository: RepositoryId, keys: Sequence): Map { + totalRequests++ + totalObjects += keys.count() + return client.getObjects(repository, keys) } + } +} - // Traverse the whole model. - val requestCountFirstTraversal = assertRequestCount(10) { - rootNode.getDescendants(true).count() +private interface AccessPattern { + fun runPattern(rootNode: INode) +} + +private object DepthFirstSearchPattern : AccessPattern { + override fun runPattern(rootNode: INode) { + rootNode.getDescendants(true).count() + } +} + +private object ParallelDepthFirstSearchPattern : AccessPattern { + override fun runPattern(rootNode: INode) { + rootNode.getDescendants(true).zip(rootNode.getDescendants(true).drop(100)).count() + } +} + +private object BreathFirstSearchPattern : AccessPattern { + override fun runPattern(rootNode: INode) { + val queue = ArrayDeque() + queue.addLast(rootNode) + while (queue.isNotEmpty()) { + queue.addAll(queue.removeFirst().allChildren) } + } +} - // Traverse the whole model a second time. The model doesn't fit into the cache and some parts are already - // unloaded during the first traversal. The unloaded parts need to be requested again. - // But the navigation to the first leaf is like a warmup of the cache for the whole model traversal. - // The previous traversal can benefit from that, but the next one cannot and is expected to need more requests. - assertRequestCount(requestCountFirstTraversal + 1) { - rootNode.getDescendants(true).count() +private class RandomPattern(val maxAccessOperations: Int, val random: kotlin.random.Random) : AccessPattern { + override fun runPattern(rootNode: INode) { + var currentNode = rootNode + + for (i in 1..maxAccessOperations) { + val nextNode = when (random.nextInt(2)) { + 0 -> currentNode.parent ?: currentNode.allChildren.toList().random(random) + else -> currentNode.allChildren.toList().let { + if (it.isEmpty()) currentNode.parent!! else it.random(random) + } + } + currentNode = nextNode } } }