Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Optional Benchmark Warmups #385

Merged
merged 13 commits into from
Nov 22, 2024
3 changes: 3 additions & 0 deletions .github/resources/adhoc-scale-benchmark.properties
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ record.compression=LZO
# Row count to scale tests (Tests can override but typically do not)
scale.row.count=${baseRowCount}

# Row count to scale warmups before tests
warmup.row.count=0

# True: Use a timestamp for the parent directory of each test run
# False: Overwrite previous test results for each test run
# Blank: Overwrite if JUnit launch, timestamp if Benchmark main launch
Expand Down
3 changes: 3 additions & 0 deletions .github/resources/compare-scale-benchmark.properties
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ record.compression=SNAPPY
# Row count to scale tests (Tests can override but typically do not)
scale.row.count=70000000

# Row count to scale warmups before tests
warmup.row.count=0

# True: Use a timestamp for the parent directory of each test run
# False: Overwrite previous test results for each test run
# Blank: Overwrite if JUnit launch, timestamp if Benchmark main launch
Expand Down
3 changes: 3 additions & 0 deletions .github/resources/nightly-scale-benchmark.properties
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ record.compression=LZ4
# Row count to scale tests (Tests can override but typically do not)
scale.row.count=${baseRowCount}

# Row count to scale warmups before tests
warmup.row.count=0

# True: Use a timestamp for the parent directory of each test run
# False: Overwrite previous test results for each test run
# Blank: Overwrite if JUnit launch, timestamp if Benchmark main launch
Expand Down
3 changes: 3 additions & 0 deletions .github/resources/release-scale-benchmark.properties
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ record.compression=LZO
# Row count to scale tests (Tests can override but typically do not)
scale.row.count=${baseRowCount}

# Row count to scale warmups before tests
warmup.row.count=0

# True: Use a timestamp for the parent directory of each test run
# False: Overwrite previous test results for each test run
# Blank: Overwrite if JUnit launch, timestamp if Benchmark main launch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,22 +173,30 @@ public void test(String name, String operation, String... loadColumns) {
*/
public void test(String name, long maxExpectedRowCount, String operation, String... loadColumns) {
if (staticFactor > 0) {
var read = getReadOperation(staticFactor, loadColumns);
var result = runStaticTest(name, operation, read, loadColumns);
var sname = name + " -Static";
var warmup = getStaticQuery(sname, operation, getWarmupRowCount(), loadColumns);
var query = getStaticQuery(sname, operation, getGeneratedRowCount(), loadColumns);
var result = runTest(sname, warmup, query);
var rcount = result.resultRowCount();
var ecount = getMaxExpectedRowCount(maxExpectedRowCount, staticFactor);
assertTrue(rcount > 0 && rcount <= ecount, "Wrong result Static row count: " + rcount);
}

if (incFactor > 0) {
var read = getReadOperation(incFactor, loadColumns);
var result = runIncTest(name, operation, read, loadColumns);
var iname = name + " -Inc";
var warmup = getIncQuery(iname, operation, getWarmupRowCount(), loadColumns);
var query = getIncQuery(iname, operation, getGeneratedRowCount(), loadColumns);
var result = runTest(iname, warmup, query);
var rcount = result.resultRowCount();
var ecount = getMaxExpectedRowCount(maxExpectedRowCount, incFactor);
assertTrue(rcount > 0 && rcount <= ecount, "Wrong result Inc row count: " + rcount);
}
}

long getWarmupRowCount() {
return (long) (api.propertyAsIntegral("warmup.row.count", "0") * rowCountFactor);
}

long getGeneratedRowCount() {
return (long) (api.propertyAsIntegral("scale.row.count", "100000") * rowCountFactor);
}
Expand All @@ -197,30 +205,32 @@ long getMaxExpectedRowCount(long expectedRowCount, long scaleFactor) {
return (expectedRowCount < 1) ? Long.MAX_VALUE : expectedRowCount;
}

String getReadOperation(int scaleFactor, String... loadColumns) {
String getReadOperation(int scaleFactor, long rowCount, String... loadColumns) {
if (scaleFactor > 1 && mainTable.equals("timed") && Arrays.asList(loadColumns).contains("timestamp")) {
var read = """
merge([
read('/data/timed.parquet').view(formulas=[${loadColumns}])
] * ${scaleFactor}).update_view([
'timestamp=timestamp.plusMillis((long)(ii / ${rows}) * ${rows})'
]).select()
]).head(${rows}).select()
""";
return read.replace("${scaleFactor}", "" + scaleFactor).replace("${rows}", "" + getGeneratedRowCount());
return read.replace("${scaleFactor}", "" + scaleFactor).replace("${rows}", "" + rowCount);
}

var read = "read('/data/${mainTable}.parquet').select(formulas=[${loadColumns}])";
read = (loadColumns.length == 0) ? ("empty_table(" + getGeneratedRowCount() + ")") : read;
var read = "read('/data/${mainTable}.parquet').head(${rows}).select(formulas=[${loadColumns}])";
read = (loadColumns.length == 0) ? ("empty_table(${rows})") : read;

if (scaleFactor > 1) {
read = "merge([${readTable}] * ${scaleFactor})".replace("${readTable}", read);
read = read.replace("${scaleFactor}", "" + scaleFactor);
}
return read;
return read.replace("${rows}", "" + rowCount);
}

Result runStaticTest(String name, String operation, String read, String... loadColumns) {
String getStaticQuery(String name, String operation, long warmupRows, String... loadColumns) {
var staticQuery = """
source = right = timed = result = None
bench_api_metrics_init()
${loadSupportTables}
${mainTable} = ${readTable}
loaded_tbl_size = ${mainTable}.size
Expand All @@ -243,12 +253,14 @@ Result runStaticTest(String name, String operation, String read, String... loadC
long_col("result_row_count", [result.size]),
])
""";
return runTest(name + " -Static", staticQuery, operation, read, loadColumns);
var read = getReadOperation(staticFactor, warmupRows, loadColumns);
return populateQuery(name, staticQuery, operation, read, loadColumns);
}

Result runIncTest(String name, String operation, String read, String... loadColumns) {
String getIncQuery(String name, String operation, long warmupRows, String... loadColumns) {
var incQuery = """
source = right = timed = None
source = right = timed = result = source_filter = right_filter = autotune = None
bench_api_metrics_init()
${loadSupportTables}
${mainTable} = ${readTable}
loaded_tbl_size = ${mainTable}.size
Expand Down Expand Up @@ -287,15 +299,11 @@ Result runIncTest(String name, String operation, String read, String... loadColu
long_col("result_row_count", [result.size])
])
""";
return runTest(name + " -Inc", incQuery, operation, read, loadColumns);
var read = getReadOperation(staticFactor, warmupRows, loadColumns);
return populateQuery(name, incQuery, operation, read, loadColumns);
}

Result runTest(String name, String query, String operation, String read, String... loadColumns) {
if (api.isClosed())
initialize(testInst);
api.setName(name);
stopUnusedServices(requiredServices);

String populateQuery(String name, String query, String operation, String read, String... loadColumns) {
query = query.replace("${readTable}", read);
query = query.replace("${mainTable}", mainTable);
query = query.replace("${loadSupportTables}", loadSupportTables());
Expand All @@ -305,10 +313,21 @@ Result runTest(String name, String query, String operation, String read, String.
query = query.replace("${operation}", operation);
query = query.replace("${logOperationBegin}", getLogSnippet("Begin", name));
query = query.replace("${logOperationEnd}", getLogSnippet("End", name));
return query;
}


Result runTest(String name, String warmupQuery, String mainQuery) {
if (api.isClosed())
initialize(testInst);
api.setName(name);
stopUnusedServices(requiredServices);

try {
if (getWarmupRowCount() > 0)
api.query(warmupQuery).execute();
var result = new AtomicReference<Result>();
api.query(query).fetchAfter("stats", table -> {
api.query(mainQuery).fetchAfter("stats", table -> {
long loadedRowCount = table.getSum("processed_row_count").longValue();
long resultRowCount = table.getSum("result_row_count").longValue();
long elapsedNanos = table.getSum("elapsed_nanos").longValue();
Expand All @@ -317,9 +336,9 @@ Result runTest(String name, String query, String operation, String read, String.
}).fetchAfter("standard_metrics", table -> {
api.metrics().add(table);
var metrics = new Metrics(Timer.now(), "test-runner", "setup.scale");
metrics.set("static_scale_factor", staticFactor);
metrics.set("inc_scale_factor", incFactor);
metrics.set("row_count_factor", rowCountFactor);
metrics.set("static.factor", staticFactor);
metrics.set("inc.factor", incFactor);
metrics.set("row.factor", rowCountFactor);
api.metrics().add(metrics);
}).execute();
api.result().test("deephaven-engine", result.get().elapsedTime(), result.get().loadedRowCount());
Expand Down Expand Up @@ -353,8 +372,6 @@ void initialize(Object testInst) {
from numpy import typing as npt
import numpy as np
import numba as nb

bench_api_metrics_init()
""";

this.api = Bench.create(testInst);
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/deephaven/benchmark/api/Snippets.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def bench_api_metrics_start():
""";

/**
* Get difference from <code>bench_api_metrics_start values and add as collected metrics
* Get difference from <code>bench_api_metrics_start</code> values and add as collected metrics
*/
static String bench_api_metrics_end = """
def bench_api_metrics_end():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ record.compression=SNAPPY
# Row count to scale tests (Tests can override but typically do not)
scale.row.count=100000

# Row count to scale warmups before tests
warmup.row.count=0

# True: Use a timestamp for the parent directory of each test run
# False: Overwrite previous test results for each test run
# Blank: Overwrite if JUnit launch, timestamp if Benchmark main launch
Expand Down