Skip to content

Commit

Permalink
Csv read and write benchmarks (deephaven#221)
Browse files Browse the repository at this point in the history
  • Loading branch information
stanbrub authored Nov 18, 2023
1 parent b07544d commit 027a440
Show file tree
Hide file tree
Showing 6 changed files with 288 additions and 152 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package io.deephaven.benchmark.tests.standard.file;

import org.junit.jupiter.api.*;
import org.junit.jupiter.api.MethodOrderer.OrderAnnotation;

/**
* Standard tests for writing single column CSV for different column types.
*/
@TestMethodOrder(OrderAnnotation.class)
class CsvColTypeTest {
final FileTestRunner runner = new FileTestRunner(this);

@Test
@Order(1)
void writeThreeIntegralCols() {
runner.setScaleFactors(5, 3);
runner.runCsvWriteTest("CsvWrite- 3 Integral Cols -Static", "short10K", "int10K", "long10K");
}

@Test
@Order(2)
void readThreeIntegralCols() {
runner.setScaleFactors(5, 3);
runner.runCsvReadTest("CsvRead- 3 Integral Cols -Static", "short10K", "int10K", "long10K");
}

@Test
@Order(3)
void writeOneStringCol() {
runner.setScaleFactors(5, 5);
runner.runCsvWriteTest("CsvWrite- 1 String Col -Static", "str10K");
}

@Test
@Order(4)
void readOneStringCol() {
runner.setScaleFactors(5, 5);
runner.runCsvReadTest("CsvRead- 1 String Col -Static", "str10K");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.deephaven.benchmark.tests.standard.file;

import org.junit.jupiter.api.*;
import org.junit.jupiter.api.MethodOrderer.OrderAnnotation;

/**
* Standard tests for writing/reading multi-column data. To save time, the csv generated by the "write" tests is used by
* the "read" tests.
*/
@TestMethodOrder(OrderAnnotation.class)
class CsvMultiColTest {
final String[] usedColumns = {"str10K", "long10K", "int10K", "short10K"};
final FileTestRunner runner = new FileTestRunner(this);


@BeforeEach
void setup() {
runner.setScaleFactors(5, 2);
}

@Test
@Order(1)
void writeMultiCol() {
runner.runCsvWriteTest("CsvWrite- Multi Col -Static", usedColumns);
}

@Test
@Order(2)
void readMultiCol() {
runner.runCsvReadTest("CsvRead- Multi Col -Static");
}

}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.deephaven.benchmark.tests.standard.parquet;
package io.deephaven.benchmark.tests.standard.file;

import static org.junit.jupiter.api.Assertions.assertEquals;
import java.time.Duration;
Expand All @@ -12,7 +12,7 @@
/**
* Test reading and writing parquet files with various data types and compression codecs.
*/
class ParquetTestRunner {
class FileTestRunner {
final String parquetCfg = "max_dictionary_keys=1048576, max_dictionary_size=1048576, target_page_size=65536";
final Object testInst;
final Bench api;
Expand All @@ -21,7 +21,7 @@ class ParquetTestRunner {
private long scaleRowCount;
private boolean useParquetDefaultSettings = false;

ParquetTestRunner(Object testInst) {
FileTestRunner(Object testInst) {
this.testInst = testInst;
this.api = initialize(testInst);
this.scaleRowCount = api.propertyAsIntegral("scale.row.count", "100000");
Expand All @@ -48,15 +48,63 @@ void useParquetDefaultSettings() {
}

/**
* Read a benchmark that measures parquet read performance. This tests always runs after a corresponding write test.
* Run a benchmark that measures csv read performance. This test always runs after a corresponding write test.
*
* @param testName name that will appear in the results as the benchmark name
*/
void runReadTest(String testName) {
void runCsvReadTest(String testName, String... columnNames) {
var q = "read_csv('/data/source.ptr.csv', ${types})";
q = q.replace("${types}", getTypes(columnNames));
runReadTest(testName, q);
}

/**
* Run a benchmark that measures parquet read performance. This test always runs after a corresponding write test.
*
* @param testName name that will appear in the results as the benchmark name
*/
void runParquetReadTest(String testName) {
runReadTest(testName, "read('/data/source.ptr.parquet').select()");
}

/**
* Run a benchmark the measures parquet write performance.
*
* @param testName the benchmark name to record with the measurement
* @param codec a compression codec
* @param columnNames the names of the pre-defined columns to generate
*/
void runParquetWriteTest(String testName, String codec, String... columnNames) {
var q = """
write(
source, '/data/source.ptr.parquet', compression_codec_name='${codec}'${parquetSettings}
)
""";
q = q.replace("${codec}", codec.equalsIgnoreCase("none") ? "UNCOMPRESSED" : codec);
q = q.replace("${parquetSettings}", useParquetDefaultSettings ? "" : (",\n " + parquetCfg));
runWriteTest(testName, q, columnNames);
}

/**
* Run a benchmark the measures csv write performance.
*
* @param testName the benchmark name to record with the measurement
* @param columnNames the names of the pre-defined columns to generate
*/
void runCsvWriteTest(String testName, String... columnNames) {
runWriteTest(testName, "write_csv(source, '/data/source.ptr.csv')", columnNames);
}

/**
* Run a benchmark that measures read performance. This test always runs after a corresponding write test.
*
* @param testName name that will appear in the results as the benchmark name
*/
private void runReadTest(String testName, String readQuery, String... columnNames) {
var q = """
bench_api_metrics_snapshot()
begin_time = time.perf_counter_ns()
source = read('/data/source.ptr.parquet').select()
source = ${readQuery}
end_time = time.perf_counter_ns()
bench_api_metrics_snapshot()
standard_metrics = bench_api_metrics_collect()
Expand All @@ -67,27 +115,19 @@ void runReadTest(String testName) {
long_col("result_row_count", [source.size])
])
""";
q = q.replace("${readQuery}", readQuery);
runTest(testName, q);
}

/**
* Run a benchmark the measures parquet write performance.
*
* @param testName the benchmark name to record with the measurement
* @param codec a compression codec
* @param columnNames the names of the pre-defined columns to generate
*/
void runWriteTest(String testName, String codec, String... columnNames) {
private void runWriteTest(String testName, String writeQuery, String... columnNames) {
var q = """
source = merge([empty_table(${rowCount}).update([
${generators}
])] * ${scaleFactor})
bench_api_metrics_snapshot()
begin_time = time.perf_counter_ns()
write(
source, '/data/source.ptr.parquet', compression_codec_name='${codec}'${parquetSettings}
)
${writeQuery}
end_time = time.perf_counter_ns()
bench_api_metrics_snapshot()
standard_metrics = bench_api_metrics_collect()
Expand All @@ -98,11 +138,10 @@ void runWriteTest(String testName, String codec, String... columnNames) {
long_col("result_row_count", [source.size])
])
""";
q = q.replace("${writeQuery}", writeQuery);
q = q.replace("${rowCount}", "" + scaleRowCount);
q = q.replace("${scaleFactor}", "" + scaleFactor);
q = q.replace("${codec}", codec.equalsIgnoreCase("none") ? "UNCOMPRESSED" : codec);
q = q.replace("${generators}", getGenerators(columnNames));
q = q.replace("${parquetSettings}", useParquetDefaultSettings ? "" : (",\n " + parquetCfg));
runTest(testName, q);
}

Expand All @@ -112,7 +151,7 @@ void runWriteTest(String testName, String codec, String... columnNames) {
* @param testName the benchmark name to record with the results
* @param query the test query to run
*/
void runTest(String testName, String query) {
private void runTest(String testName, String query) {
try {
api.setName(testName);
api.query(query).fetchAfter("stats", table -> {
Expand All @@ -139,7 +178,7 @@ void runTest(String testName, String query) {
* @param columnNames the column names to generate code for
* @return the lines of code needed to generate column ndata
*/
String getGenerators(String... columnNames) {
private String getGenerators(String... columnNames) {
return Arrays.stream(columnNames).map(c -> "'" + c + "=" + getGenerator(c) + "'")
.collect(Collectors.joining(",\n")) + '\n';
}
Expand All @@ -150,7 +189,7 @@ String getGenerators(String... columnNames) {
* @param columnName the column name to generate data for
* @return the data generation code
*/
String getGenerator(final String columnName) {
private String getGenerator(final String columnName) {
var array5 = "java.util.stream.IntStream.range((int)(ii % 5),(int)((ii % 5) + 5)).toArray()";
var array1K = "java.util.stream.IntStream.range((int)(ii % 1000),(int)((ii % 1000) + 1000)).toArray()";
var objArr5 = "java.util.stream.Stream.of(`1`,null,`3`,null,`5`).toArray()";
Expand All @@ -171,19 +210,42 @@ String getGenerator(final String columnName) {
return "(ii % 10 == 0) ? null : " + gen;
}

private String getTypes(String... cols) {
return "{" + Arrays.stream(cols).map(c -> "'" + c + "':" + getType(c)).collect(Collectors.joining(",")) + "}";
}

private String getType(String columnName) {
return switch (columnName) {
case "str10K" -> "dht.string";
case "long10K" -> "dht.long";
case "int10K" -> "dht.int_";
case "short10K" -> "dht.short";
case "bigDec10K" -> "dht.BigDecimal";
case "intArr5" -> "dht.int_array";
case "intVec5" -> "dht.int_array";
case "intArr1K" -> "dht.int_array";
case "intVec1K" -> "dht.int_array";
case "objArr5" -> "string_array";
case "objVec5" -> "string_array";
default -> throw new RuntimeException("Undefined column: " + columnName);
};
}

/**
* Initialize the test client and its properties. Restart Docker if it is local to the test client and the
* {@code docker.compose.file} set.
*
* @param testInst the test instance this runner is associated with.
* @return a new Bench API instance.
*/
Bench initialize(Object testInst) {
private Bench initialize(Object testInst) {
var query = """
import time
from deephaven import empty_table, garbage_collect, new_table, merge
from deephaven.column import long_col, double_col
from deephaven.parquet import read, write
from deephaven import read_csv, write_csv
from deephaven import dtypes as dht
""";

Bench api = Bench.create(testInst);
Expand All @@ -197,7 +259,7 @@ Bench initialize(Object testInst) {
*
* @param api the Bench API for this test runner.
*/
void restartDocker(Bench api) {
private void restartDocker(Bench api) {
var timer = api.timer();
if (!Exec.restartDocker(api.property("docker.compose.file", ""), api.property("deephaven.addr", "")))
return;
Expand Down
Loading

0 comments on commit 027a440

Please sign in to comment.