diff --git a/Agent.kt b/Agent.kt index cf506f0dc..368577cc9 100644 --- a/Agent.kt +++ b/Agent.kt @@ -51,7 +51,7 @@ abstract class Agent> prot fun run(session: SESSION, partition: PARTITION, random: RandomSource): List { return actionHandlers[action]?.let { - (0 until runsPerIteration).flatMap { it(session, partition, random) } + (0 until runsPerIteration).flatMap { it(session, partition, random.nextSource()) } } ?: throw IllegalArgumentException("The action '$action' has no registered handler in '${javaClass.simpleName}'" + if (action == DEFAULT_ACTION) " (help: '$action' is the default action)" else "") } @@ -60,16 +60,15 @@ abstract class Agent> prot return context.isTracing && this.tracingEnabled } - fun iterate(randomSrc: RandomSource): Map> { + fun iterate(randomSource: RandomSource): Map> { val reports = ConcurrentHashMap>() // We need to generate pairs of Partition and Random deterministically before passing them to a parallel stream if (context.partitionCount > partitions.size) throw IllegalArgumentException("Partition count exceeds supplied number of partitions.") val validPartitions = partitions.subList(0, context.partitionCount) val asyncRuns = validPartitions.map { partition -> - val randomSrc2 = randomSrc.nextSource() CompletableFuture.runAsync( { - val report = runAndMayTrace(partition, randomSrc2) + val report = runAndMayTrace(partition, randomSource) if (context.isReporting) reports[partition.tracker] = report else assert(report.isEmpty()) }, context.executor ) @@ -78,13 +77,13 @@ abstract class Agent> prot return reports } - private fun runAndMayTrace(partition: PARTITION, random: RandomSource): List { + private fun runAndMayTrace(partition: PARTITION, randomSource: RandomSource): List { var tracingCtx: FactoryTracingThreadStatic.ThreadContext? = null return try { if (shouldTrace()) tracingCtx = FactoryTracingThreadStatic.contextOnThread(partition.tracker, context.iterationNumber) val session = client.session(partition) - mayTrace(className(agentClass)) { run(session, partition, random) } + mayTrace(className(agentClass)) { run(session, partition, randomSource) } } finally { tracingCtx?.close() } diff --git a/Context.kt b/Context.kt index 5ccdcd829..edd3f43c4 100644 --- a/Context.kt +++ b/Context.kt @@ -35,6 +35,7 @@ open class Context( val dbName = config.run.databaseName val iterationMax = config.run.iterations val partitionCount = config.run.partitions + val recreateDatabase = config.run.recreateDatabase val model = config.model diff --git a/Simulation.kt b/Simulation.kt index 33c70251f..d7c7001a2 100644 --- a/Simulation.kt +++ b/Simulation.kt @@ -36,7 +36,12 @@ abstract class Simulation, out CONTEXT: Context<*, *>>( protected abstract val name: String fun init() { - init(randomSource.nextSource()) + // The master random source is incremented regardless of whether the database is recreated or not to ensure determinism. + // The potentially redundant randomSource2 has to be created in this way for it to work. + // Please refer to: https://github.com/vaticle/typedb-simulation/issues/145 + + val randomSource2 = randomSource.nextSource() + if (context.recreateDatabase) init(randomSource2) } abstract fun init(randomSource: RandomSource) @@ -76,8 +81,8 @@ abstract class Simulation, out CONTEXT: Context<*, *>>( val agents = initAgents() agents.forEach { agent -> val start = Instant.now() - val reports = agent.iterate(randomSource.nextSource()) - LOGGER.info("{}.{} took: {}", agent.javaClass.simpleName, agent.action, printDuration(start, Instant.now())) + val reports = agent.iterate(randomSource) + LOGGER.info("{}.{} × {} took: {}", agent.javaClass.simpleName, agent.action, agent.runsPerIteration, printDuration(start, Instant.now())) agentReports[agent.javaClass.superclass.simpleName] = reports } context.incrementIteration() diff --git a/common/params/Config.kt b/common/params/Config.kt index 04119c031..190b022f5 100644 --- a/common/params/Config.kt +++ b/common/params/Config.kt @@ -29,6 +29,7 @@ import com.vaticle.typedb.simulation.common.params.Config.Keys.TRACE_SAMPLING import com.vaticle.typedb.common.yaml.YAML import com.vaticle.typedb.simulation.common.params.Config.Keys.ACTION import com.vaticle.typedb.simulation.common.params.Config.Keys.PARTITIONS +import com.vaticle.typedb.simulation.common.params.Config.Keys.RECREATE_DATABASE import com.vaticle.typedb.simulation.common.params.Config.Keys.RUNS_PER_ITERATION import java.io.File import kotlin.math.ln @@ -82,13 +83,14 @@ class Config(val agents: List, val traceSampling: TraceSamplin } } - class Run(val randomSeed: Long, val iterations: Int, val partitions: Int, val databaseName: String) { + class Run(val randomSeed: Long, val iterations: Int, val partitions: Int, val databaseName: String, val recreateDatabase: Boolean) { companion object { internal fun of(yaml: YAML.Map) = Run( databaseName = yaml[DATABASE_NAME].asString().value(), iterations = yaml[ITERATIONS].asInt().value(), partitions = yaml[PARTITIONS].asInt().value(), - randomSeed = yaml[RANDOM_SEED].asInt().value().toLong() + randomSeed = yaml[RANDOM_SEED].asInt().value().toLong(), + recreateDatabase = yaml[RECREATE_DATABASE].asBoolean().value() ) } } @@ -118,6 +120,7 @@ class Config(val agents: List, val traceSampling: TraceSamplin const val PARTITIONS = "partitions" const val NAME = "name" const val RANDOM_SEED = "randomSeed" + const val RECREATE_DATABASE = "recreateDatabase" const val RUN = "run" const val RUNS_PER_ITERATION = "runsPerIteration" const val TRACE = "trace" diff --git a/neo4j/Neo4jSimulation.kt b/neo4j/Neo4jSimulation.kt index c87e7f70e..c4971d72d 100644 --- a/neo4j/Neo4jSimulation.kt +++ b/neo4j/Neo4jSimulation.kt @@ -37,7 +37,6 @@ abstract class Neo4jSimulation> protected constructor override fun init(randomSource: RandomSource) { val nativeDriver = client.unpack() initDatabase(nativeDriver) - LOGGER.info("Neo4j initialisation of $name simulation data started ...") val start = Instant.now() initData(nativeDriver, randomSource)