Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parametrised database reinitialisation #144

Merged
merged 10 commits into from
Mar 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions Agent.kt
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ abstract class Agent<PARTITION: Partition, SESSION, CONTEXT: Context<*, *>> prot

fun run(session: SESSION, partition: PARTITION, random: RandomSource): List<Report> {
return actionHandlers[action]?.let {
(0 until runsPerIteration).flatMap { it(session, partition, random) }
(0 until runsPerIteration).flatMap { it(session, partition, random.nextSource()) }
james-whiteside marked this conversation as resolved.
Show resolved Hide resolved
} ?: throw IllegalArgumentException("The action '$action' has no registered handler in '${javaClass.simpleName}'"
+ if (action == DEFAULT_ACTION) " (help: '$action' is the default action)" else "")
}
Expand All @@ -60,16 +60,15 @@ abstract class Agent<PARTITION: Partition, SESSION, CONTEXT: Context<*, *>> prot
return context.isTracing && this.tracingEnabled
}

fun iterate(randomSrc: RandomSource): Map<String, List<Report>> {
fun iterate(randomSource: RandomSource): Map<String, List<Report>> {
val reports = ConcurrentHashMap<String, List<Report>>()
// We need to generate pairs of Partition and Random deterministically before passing them to a parallel stream
if (context.partitionCount > partitions.size) throw IllegalArgumentException("Partition count exceeds supplied number of partitions.")
val validPartitions = partitions.subList(0, context.partitionCount)
val asyncRuns = validPartitions.map { partition ->
val randomSrc2 = randomSrc.nextSource()
CompletableFuture.runAsync(
{
val report = runAndMayTrace(partition, randomSrc2)
val report = runAndMayTrace(partition, randomSource)
if (context.isReporting) reports[partition.tracker] = report else assert(report.isEmpty())
}, context.executor
)
Expand All @@ -78,13 +77,13 @@ abstract class Agent<PARTITION: Partition, SESSION, CONTEXT: Context<*, *>> prot
return reports
}

private fun runAndMayTrace(partition: PARTITION, random: RandomSource): List<Report> {
private fun runAndMayTrace(partition: PARTITION, randomSource: RandomSource): List<Report> {
var tracingCtx: FactoryTracingThreadStatic.ThreadContext? = null
return try {
if (shouldTrace()) tracingCtx =
FactoryTracingThreadStatic.contextOnThread(partition.tracker, context.iterationNumber)
val session = client.session(partition)
mayTrace(className(agentClass)) { run(session, partition, random) }
mayTrace(className(agentClass)) { run(session, partition, randomSource) }
} finally {
tracingCtx?.close()
}
Expand Down
1 change: 1 addition & 0 deletions Context.kt
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ open class Context<out SEED_DATA, out MODEL_PARAMS>(
val dbName = config.run.databaseName
val iterationMax = config.run.iterations
val partitionCount = config.run.partitions
val recreateDatabase = config.run.recreateDatabase

val model = config.model

Expand Down
11 changes: 8 additions & 3 deletions Simulation.kt
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,12 @@ abstract class Simulation<CLIENT: DBClient<*>, out CONTEXT: Context<*, *>>(
protected abstract val name: String

fun init() {
init(randomSource.nextSource())
// The master random source is incremented regardless of whether the database is recreated or not to ensure determinism.
// The potentially redundant randomSource2 has to be created in this way for it to work.
// Please refer to: https://github.com/vaticle/typedb-simulation/issues/145

val randomSource2 = randomSource.nextSource()
if (context.recreateDatabase) init(randomSource2)
Comment on lines +43 to +44
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So when recreateDatabase is false, we generate a new random source but just discard it?

While this is probably harmless, it reads awkwardly. We should not do that.

Suggested change
val randomSource2 = randomSource.nextSource()
if (context.recreateDatabase) init(randomSource2)
if (context.recreateDatabase) init(randomSource.nextSource())

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just noticed the comment in the PR description about incrementing the master random source whether the DB is recreated or not.

If that's the behaviour we want, then fine - I'd say it has the potential to blindside the reader though! Could we add a comment in the code itself clarifying that yes, this is really what we want to do here (and why)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that is the best way to ensure the random selections remain deterministic. Resolved as discussed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The following change was tested (lines 39 and 40):

if (context.recreateDatabase) init(randomSource.nextSource())
else randomSource.nextSource()

however this did not produce deterministic results. The following unit test was run:

@Test
fun testSource() {
    val seed: Long = 0
    val randomSourceA = RandomSource(seed)
    val randomSourceB = RandomSource(seed)
    assert(randomSourceA.nextInt(1000000) == randomSourceB.nextInt(1000000))

    val randomSourceA2 = randomSourceA.nextSource()
    repeat(1000000) {
        randomSourceA2.nextBoolean()
    }

    if (false) {}
    else randomSourceB.nextSource()

    assert(randomSourceA.nextInt(1000000) == randomSourceB.nextInt(1000000))
}

which passed. It is unknown why these changes to lines 39 and 40 affect the determinism of the simulation. An issue has been opened here.

}

abstract fun init(randomSource: RandomSource)
Expand Down Expand Up @@ -76,8 +81,8 @@ abstract class Simulation<CLIENT: DBClient<*>, out CONTEXT: Context<*, *>>(
val agents = initAgents()
agents.forEach { agent ->
val start = Instant.now()
val reports = agent.iterate(randomSource.nextSource())
LOGGER.info("{}.{} took: {}", agent.javaClass.simpleName, agent.action, printDuration(start, Instant.now()))
val reports = agent.iterate(randomSource)
LOGGER.info("{}.{} × {} took: {}", agent.javaClass.simpleName, agent.action, agent.runsPerIteration, printDuration(start, Instant.now()))
agentReports[agent.javaClass.superclass.simpleName] = reports
}
context.incrementIteration()
Expand Down
7 changes: 5 additions & 2 deletions common/params/Config.kt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import com.vaticle.typedb.simulation.common.params.Config.Keys.TRACE_SAMPLING
import com.vaticle.typedb.common.yaml.YAML
import com.vaticle.typedb.simulation.common.params.Config.Keys.ACTION
import com.vaticle.typedb.simulation.common.params.Config.Keys.PARTITIONS
import com.vaticle.typedb.simulation.common.params.Config.Keys.RECREATE_DATABASE
import com.vaticle.typedb.simulation.common.params.Config.Keys.RUNS_PER_ITERATION
import java.io.File
import kotlin.math.ln
Expand Down Expand Up @@ -82,13 +83,14 @@ class Config<out MODEL>(val agents: List<Agent>, val traceSampling: TraceSamplin
}
}

class Run(val randomSeed: Long, val iterations: Int, val partitions: Int, val databaseName: String) {
class Run(val randomSeed: Long, val iterations: Int, val partitions: Int, val databaseName: String, val recreateDatabase: Boolean) {
companion object {
internal fun of(yaml: YAML.Map) = Run(
databaseName = yaml[DATABASE_NAME].asString().value(),
iterations = yaml[ITERATIONS].asInt().value(),
partitions = yaml[PARTITIONS].asInt().value(),
randomSeed = yaml[RANDOM_SEED].asInt().value().toLong()
randomSeed = yaml[RANDOM_SEED].asInt().value().toLong(),
recreateDatabase = yaml[RECREATE_DATABASE].asBoolean().value()
)
}
}
Expand Down Expand Up @@ -118,6 +120,7 @@ class Config<out MODEL>(val agents: List<Agent>, val traceSampling: TraceSamplin
const val PARTITIONS = "partitions"
const val NAME = "name"
const val RANDOM_SEED = "randomSeed"
const val RECREATE_DATABASE = "recreateDatabase"
const val RUN = "run"
const val RUNS_PER_ITERATION = "runsPerIteration"
const val TRACE = "trace"
Expand Down
1 change: 0 additions & 1 deletion neo4j/Neo4jSimulation.kt
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ abstract class Neo4jSimulation<out CONTEXT: Context<*, *>> protected constructor
override fun init(randomSource: RandomSource) {
val nativeDriver = client.unpack()
initDatabase(nativeDriver)

LOGGER.info("Neo4j initialisation of $name simulation data started ...")
val start = Instant.now()
initData(nativeDriver, randomSource)
Expand Down