From 07f1095e06ca24b14b225cf2a0f1f5f476620891 Mon Sep 17 00:00:00 2001 From: Denis Lochmelis Date: Tue, 6 Aug 2024 20:57:33 +0200 Subject: [PATCH] Replace all "join handles" with BlockingFuture --- .../org/jetbrains/litmuskt/LitmusRunner.kt | 10 +++++----- .../kotlin/org/jetbrains/litmuskt/Threadlike.kt | 16 ++++++---------- .../org/jetbrains/litmuskt/ThreadlikeRunner.kt | 6 +++--- .../kotlin/org/jetbrains/litmuskt/Utils.kt | 7 +++++++ .../org/jetbrains/litmuskt/PthreadThreadlike.kt | 2 +- .../org/jetbrains/litmuskt/WorkerThreadlike.kt | 2 +- 6 files changed, 23 insertions(+), 20 deletions(-) diff --git a/core/src/commonMain/kotlin/org/jetbrains/litmuskt/LitmusRunner.kt b/core/src/commonMain/kotlin/org/jetbrains/litmuskt/LitmusRunner.kt index 2211f36..4af65dd 100644 --- a/core/src/commonMain/kotlin/org/jetbrains/litmuskt/LitmusRunner.kt +++ b/core/src/commonMain/kotlin/org/jetbrains/litmuskt/LitmusRunner.kt @@ -15,13 +15,13 @@ abstract class LitmusRunner { barrierProducer: BarrierProducer, syncPeriod: Int, affinityMap: AffinityMap?, - ): () -> LitmusResult + ): BlockingFuture /** * Entry point for running tests. This method can be overridden in case that particular runner * does not need to allocate states. */ - open fun startTest(test: LitmusTest, params: LitmusRunParams): () -> LitmusResult { + open fun startTest(test: LitmusTest, params: LitmusRunParams): BlockingFuture { val states = TypedArray(params.batchSize) { test.stateProducer() } return startTest(test, states, params.barrierProducer, params.syncPeriod, params.affinityMap) } @@ -38,7 +38,7 @@ abstract class LitmusRunner { test: LitmusTest, params: LitmusRunParams, instances: Int, - ): List<() -> LitmusResult> { + ): List> { // separated due to allocations severely impacting threads val allStates = List(instances) { TypedArray(params.batchSize) { test.stateProducer() } @@ -113,7 +113,7 @@ fun LitmusRunner.runSingleTestParallel( timeLimit: Duration = Duration.ZERO, instances: Int = cpuCount() / test.threadCount, ): LitmusResult = repeatFor(timeLimit) { - startTestParallel(test, params, instances).map { it() }.mergeResults() + startTestParallel(test, params, instances).map { it.await() }.mergeResults() }.mergeResults() /** @@ -126,7 +126,7 @@ fun LitmusRunner.runTests( params: LitmusRunParams, timeLimit: Duration = Duration.ZERO, ): List = tests.map { test -> - repeatFor(timeLimit) { startTest(test, params).invoke() }.mergeResults() + repeatFor(timeLimit) { startTest(test, params).await() }.mergeResults() } // guaranteed to run [f] at least once diff --git a/core/src/commonMain/kotlin/org/jetbrains/litmuskt/Threadlike.kt b/core/src/commonMain/kotlin/org/jetbrains/litmuskt/Threadlike.kt index 0323e61..0309517 100644 --- a/core/src/commonMain/kotlin/org/jetbrains/litmuskt/Threadlike.kt +++ b/core/src/commonMain/kotlin/org/jetbrains/litmuskt/Threadlike.kt @@ -6,23 +6,19 @@ package org.jetbrains.litmuskt */ interface Threadlike { /** - * Start running the function in a "thread". Note that the function should be non-capturing. + * Start running the function in a "thread". * - * This function should be only called once. + * Notes: + * 1. This function should be only called once. + * 1. [function] should be non-capturing. + * 1. Since returning a value is not currently supported, the resulting future returns a stub (Unit). * * @return a "future" handle that will block when called until the "thread" has completed. */ - fun start(args: A, function: (A) -> Unit): BlockingFuture + fun start(args: A, function: (A) -> Unit): BlockingFuture /** * Dispose of any resources the "thread" has allocated. Blocks until the resources are cleaned. */ fun dispose() } - -/** - * A future that blocks on calling [await] and returns nothing. - */ -fun interface BlockingFuture { - fun await() -} diff --git a/core/src/commonMain/kotlin/org/jetbrains/litmuskt/ThreadlikeRunner.kt b/core/src/commonMain/kotlin/org/jetbrains/litmuskt/ThreadlikeRunner.kt index ed9f9b7..ae7d5df 100644 --- a/core/src/commonMain/kotlin/org/jetbrains/litmuskt/ThreadlikeRunner.kt +++ b/core/src/commonMain/kotlin/org/jetbrains/litmuskt/ThreadlikeRunner.kt @@ -14,7 +14,7 @@ abstract class ThreadlikeRunner : LitmusRunner() { rangeResult = calcStats(states.view(resultCalcRange), test.outcomeSpec, test.outcomeFinalizer) } - private data class ThreadContext( + private class ThreadContext( val states: Array, val test: LitmusTest, val threadIndex: Int, @@ -30,7 +30,7 @@ abstract class ThreadlikeRunner : LitmusRunner() { barrierProducer: BarrierProducer, syncPeriod: Int, affinityMap: AffinityMap? - ): () -> LitmusResult { + ): BlockingFuture { val threads = List(test.threadCount) { threadlikeProducer() } @@ -54,7 +54,7 @@ abstract class ThreadlikeRunner : LitmusRunner() { } } - return { + return BlockingFuture { futures.forEach { it.await() } // await all results threads.forEach { it.dispose() } // stop all "threads" contexts.map { it.rangeResult!! }.mergeResults() diff --git a/core/src/commonMain/kotlin/org/jetbrains/litmuskt/Utils.kt b/core/src/commonMain/kotlin/org/jetbrains/litmuskt/Utils.kt index e838319..64f3ee5 100644 --- a/core/src/commonMain/kotlin/org/jetbrains/litmuskt/Utils.kt +++ b/core/src/commonMain/kotlin/org/jetbrains/litmuskt/Utils.kt @@ -52,3 +52,10 @@ fun IntRange.splitEqual(n: Int): List { } } } + +/** + * A future that blocks on calling [await]. + */ +fun interface BlockingFuture { + fun await(): T +} diff --git a/core/src/nativeMain/kotlin/org/jetbrains/litmuskt/PthreadThreadlike.kt b/core/src/nativeMain/kotlin/org/jetbrains/litmuskt/PthreadThreadlike.kt index c7511e7..7631284 100644 --- a/core/src/nativeMain/kotlin/org/jetbrains/litmuskt/PthreadThreadlike.kt +++ b/core/src/nativeMain/kotlin/org/jetbrains/litmuskt/PthreadThreadlike.kt @@ -13,7 +13,7 @@ class PthreadThreadlike : Threadlike { private class ThreadData(val args: A, val function: (A) -> Unit) - override fun start(args: A, function: (A) -> Unit): BlockingFuture { + override fun start(args: A, function: (A) -> Unit): BlockingFuture { val threadData = ThreadData(args, function) val threadDataRef = StableRef.create(threadData) diff --git a/core/src/nativeMain/kotlin/org/jetbrains/litmuskt/WorkerThreadlike.kt b/core/src/nativeMain/kotlin/org/jetbrains/litmuskt/WorkerThreadlike.kt index 5724ed1..7c4f1f3 100644 --- a/core/src/nativeMain/kotlin/org/jetbrains/litmuskt/WorkerThreadlike.kt +++ b/core/src/nativeMain/kotlin/org/jetbrains/litmuskt/WorkerThreadlike.kt @@ -14,7 +14,7 @@ class WorkerThreadlike : Threadlike { val threadFunction: (A) -> Unit, ) - override fun start(args: A, function: (A) -> Unit): BlockingFuture { + override fun start(args: A, function: (A) -> Unit): BlockingFuture { val context = WorkerContext(args, function) val future = worker.execute( TransferMode.SAFE /* ignored */,