Skip to content

Commit

Permalink
Replace all "join handles" with BlockingFuture
Browse files Browse the repository at this point in the history
  • Loading branch information
DLochmelis33 committed Aug 6, 2024
1 parent 4067297 commit 07f1095
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ abstract class LitmusRunner {
barrierProducer: BarrierProducer,
syncPeriod: Int,
affinityMap: AffinityMap?,
): () -> LitmusResult
): BlockingFuture<LitmusResult>

/**
* Entry point for running tests. This method can be overridden in case that particular runner
* does not need to allocate states.
*/
open fun <S : Any> startTest(test: LitmusTest<S>, params: LitmusRunParams): () -> LitmusResult {
open fun <S : Any> startTest(test: LitmusTest<S>, params: LitmusRunParams): BlockingFuture<LitmusResult> {
val states = TypedArray(params.batchSize) { test.stateProducer() }
return startTest(test, states, params.barrierProducer, params.syncPeriod, params.affinityMap)
}
Expand All @@ -38,7 +38,7 @@ abstract class LitmusRunner {
test: LitmusTest<S>,
params: LitmusRunParams,
instances: Int,
): List<() -> LitmusResult> {
): List<BlockingFuture<LitmusResult>> {
// separated due to allocations severely impacting threads
val allStates = List(instances) {
TypedArray(params.batchSize) { test.stateProducer() }
Expand Down Expand Up @@ -113,7 +113,7 @@ fun <S : Any> LitmusRunner.runSingleTestParallel(
timeLimit: Duration = Duration.ZERO,
instances: Int = cpuCount() / test.threadCount,
): LitmusResult = repeatFor(timeLimit) {
startTestParallel(test, params, instances).map { it() }.mergeResults()
startTestParallel(test, params, instances).map { it.await() }.mergeResults()
}.mergeResults()

/**
Expand All @@ -126,7 +126,7 @@ fun LitmusRunner.runTests(
params: LitmusRunParams,
timeLimit: Duration = Duration.ZERO,
): List<LitmusResult> = tests.map { test ->
repeatFor(timeLimit) { startTest(test, params).invoke() }.mergeResults()
repeatFor(timeLimit) { startTest(test, params).await() }.mergeResults()
}

// guaranteed to run [f] at least once
Expand Down
16 changes: 6 additions & 10 deletions core/src/commonMain/kotlin/org/jetbrains/litmuskt/Threadlike.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,19 @@ package org.jetbrains.litmuskt
*/
interface Threadlike {
/**
* Start running the function in a "thread". Note that the function should be non-capturing.
* Start running the function in a "thread".
*
* This function should be only called once.
* Notes:
* 1. This function should be only called once.
* 1. [function] should be non-capturing.
* 1. Since returning a value is not currently supported, the resulting future returns a stub (Unit).
*
* @return a "future" handle that will block when called until the "thread" has completed.
*/
fun <A : Any> start(args: A, function: (A) -> Unit): BlockingFuture
fun <A : Any> start(args: A, function: (A) -> Unit): BlockingFuture<Unit>

/**
* Dispose of any resources the "thread" has allocated. Blocks until the resources are cleaned.
*/
fun dispose()
}

/**
* A future that blocks on calling [await] and returns nothing.
*/
fun interface BlockingFuture {
fun await()
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ abstract class ThreadlikeRunner : LitmusRunner() {
rangeResult = calcStats(states.view(resultCalcRange), test.outcomeSpec, test.outcomeFinalizer)
}

private data class ThreadContext<S : Any>(
private class ThreadContext<S : Any>(
val states: Array<S>,
val test: LitmusTest<S>,
val threadIndex: Int,
Expand All @@ -30,7 +30,7 @@ abstract class ThreadlikeRunner : LitmusRunner() {
barrierProducer: BarrierProducer,
syncPeriod: Int,
affinityMap: AffinityMap?
): () -> LitmusResult {
): BlockingFuture<LitmusResult> {

val threads = List(test.threadCount) { threadlikeProducer() }

Expand All @@ -54,7 +54,7 @@ abstract class ThreadlikeRunner : LitmusRunner() {
}
}

return {
return BlockingFuture {
futures.forEach { it.await() } // await all results
threads.forEach { it.dispose() } // stop all "threads"
contexts.map { it.rangeResult!! }.mergeResults()
Expand Down
7 changes: 7 additions & 0 deletions core/src/commonMain/kotlin/org/jetbrains/litmuskt/Utils.kt
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,10 @@ fun IntRange.splitEqual(n: Int): List<IntRange> {
}
}
}

/**
* A future that blocks on calling [await].
*/
fun interface BlockingFuture<T> {
fun await(): T
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class PthreadThreadlike : Threadlike {

private class ThreadData<A : Any>(val args: A, val function: (A) -> Unit)

override fun <A : Any> start(args: A, function: (A) -> Unit): BlockingFuture {
override fun <A : Any> start(args: A, function: (A) -> Unit): BlockingFuture<Unit> {
val threadData = ThreadData(args, function)
val threadDataRef = StableRef.create(threadData)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class WorkerThreadlike : Threadlike {
val threadFunction: (A) -> Unit,
)

override fun <A : Any> start(args: A, function: (A) -> Unit): BlockingFuture {
override fun <A : Any> start(args: A, function: (A) -> Unit): BlockingFuture<Unit> {
val context = WorkerContext(args, function)
val future = worker.execute(
TransferMode.SAFE /* ignored */,
Expand Down

0 comments on commit 07f1095

Please sign in to comment.