Skip to content

Commit

Permalink
feat: Collect GC, Compile Time and Heap Metrics (#383)
Browse files Browse the repository at this point in the history
  • Loading branch information
stanbrub authored Nov 13, 2024
1 parent 7e77700 commit 1306666
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void groupedTable(String name, String... groups) {
mainTable = name;
generateTable(name, null, groups);
}

public void setServices(String... services) {
requiredServices.clear();
requiredServices.addAll(Arrays.asList(services));
Expand Down Expand Up @@ -225,16 +225,16 @@ Result runStaticTest(String name, String operation, String read, String... loadC
${mainTable} = ${readTable}
loaded_tbl_size = ${mainTable}.size
${setupQueries}
garbage_collect()
${preOpQueries}
bench_api_metrics_start()
print('${logOperationBegin}')
begin_time = time.perf_counter_ns()
result = ${operation}
end_time = time.perf_counter_ns()
print('${logOperationEnd}')
bench_api_metrics_end()
standard_metrics = bench_api_metrics_collect()
stats = new_table([
Expand All @@ -260,11 +260,9 @@ Result runIncTest(String name, String operation, String read, String... loadColu
if right:
right_filter = autotune(0, 1010000, 1.0, True)
right = right.where(right_filter)
print('Using Inc Right')
garbage_collect()
${preOpQueries}
bench_api_metrics_start()
print('${logOperationBegin}')
begin_time = time.perf_counter_ns()
result = ${operation}
Expand All @@ -280,6 +278,7 @@ Result runIncTest(String name, String operation, String read, String... loadColu
end_time = time.perf_counter_ns()
print('${logOperationEnd}')
bench_api_metrics_end()
standard_metrics = bench_api_metrics_collect()
stats = new_table([
Expand All @@ -296,7 +295,7 @@ Result runTest(String name, String query, String operation, String read, String.
initialize(testInst);
api.setName(name);
stopUnusedServices(requiredServices);

query = query.replace("${readTable}", read);
query = query.replace("${mainTable}", mainTable);
query = query.replace("${loadSupportTables}", loadSupportTables());
Expand Down Expand Up @@ -381,16 +380,16 @@ void restartServices() {
if (!controller.restartService())
return;
var metrics = new Metrics(Timer.now(), "test-runner", "setup.services");
metrics.set("restart", timer.duration().toMillis(), "standard");
metrics.set("restart", timer.duration().toMillis() / 1000.0, "standard");
api.metrics().add(metrics);
}

void stopUnusedServices(Set<String> keepServices) {
var timer = api.timer();
if (!controller.stopService(keepServices))
return;
var metrics = new Metrics(Timer.now(), "test-runner", "setup.services");
metrics.set("stop", timer.duration().toMillis(), "standard");
metrics.set("stop", timer.duration().toMillis() / 1000.0, "standard");
api.metrics().add(metrics);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,12 @@ void runCsvWriteTest(String testName, String... columnNames) {
*/
private void runReadTest(String testName, String readQuery, String... columnNames) {
var q = """
bench_api_metrics_init()
bench_api_metrics_start()
begin_time = time.perf_counter_ns()
source = ${readQuery}
end_time = time.perf_counter_ns()
bench_api_metrics_end()
standard_metrics = bench_api_metrics_collect()
stats = new_table([
Expand All @@ -194,9 +197,12 @@ private void runWriteTest(String testName, String writeQuery, String... columnNa
else:
source = empty_table(${rowCount}).update([${generators}])
bench_api_metrics_init()
bench_api_metrics_start()
begin_time = time.perf_counter_ns()
${writeQuery}
end_time = time.perf_counter_ns()
bench_api_metrics_end()
standard_metrics = bench_api_metrics_collect()
stats = new_table([
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class KafkaTestRunner {
* If a {@code docker.compose.file} is specified in supplied runtime properties, restart the corresponding docker
* images with Deephaven max heap set to the given gigabytes.
*
* @param deephavenHeapGigs the number of gigabytes to use for Deephave max heap
* @param deephavenHeapGigs the number of gigabytes to use for Deephaven max heap
*/
void restartWithHeap(int deephavenHeapGigs) {
String dockerComposeFile = api.property("docker.compose.file", "");
Expand Down Expand Up @@ -106,6 +106,7 @@ void runTest(String operation, String tableType) {
import deephaven.dtypes as dht
bench_api_metrics_init()
bench_api_metrics_start()
kc_spec = ${kafkaConsumerSpec}
begin_time = time.perf_counter_ns()
Expand All @@ -121,6 +122,7 @@ void runTest(String operation, String tableType) {
${awaitTableLoad}
end_time = time.perf_counter_ns()
bench_api_metrics_end()
standard_metrics = bench_api_metrics_collect()
stats = new_table([
Expand Down
107 changes: 91 additions & 16 deletions src/main/java/io/deephaven/benchmark/api/Snippets.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
/* Copyright (c) 2022-2023 Deephaven Data Labs and Patent Pending */
/* Copyright (c) 2022-2024 Deephaven Data Labs and Patent Pending */
package io.deephaven.benchmark.api;

/**
* Contains snippets of query code that can be called inside a query
* Contains snippets of python functions that can be called inside a query executed on the Deephaven Engine
*/
class Snippets {
/**
Expand Down Expand Up @@ -74,7 +74,8 @@ with exclusive_lock(table):
""";

/**
* Initialize the container for storing benchmark metrics
* Initialize the container for storing benchmark metrics. Define functions for getting some MX Bean data for gc,
* jit and heap
* <p>
* ex. bench_api_metrics_init()
*/
Expand All @@ -85,10 +86,76 @@ def bench_api_metrics_init():
""";

/**
* Captures the value of the first column in a table every Deephaven ticking interval and does not allow advancement
* in the current query logic until that value is reached
* Get the MX bean for the given getter factory method that works from
* <code>java.lang.management.ManagementFactory</code>
*/
static String bench_api_get_bean = """
import jpy
def bench_api_get_bean(bean_getter):
return getattr(jpy.get_type('java.lang.management.ManagementFactory'),bean_getter)()
""";

/**
* Get the current JVM heap usage in bytes
*/
static String bench_api_mem_usage = """
def bench_api_mem_usage():
return bench_api_get_bean('getMemoryMXBean').getHeapMemoryUsage().getUsed()
""";

/**
* Get the accumulated compile time
*/
static String bench_api_compile_time = """
def bench_api_compile_time():
return bench_api_get_bean('getCompilationMXBean').getTotalCompilationTime()
""";

/**
* Get the accumulated total time spent in GC and GC count
*/
static String bench_api_gc_info = """
def bench_api_gc_info():
total = 0.0; count = 0
beans = bench_api_get_bean('getGarbageCollectorMXBeans')
for i in range(0, beans.size()):
b = beans.get(i)
total = total + b.getCollectionTime()
count = count + b.getCollectionCount()
return total, count
""";

/**
* Set heap usage, compile time, GC time and GC Count to global variables
*/
static String bench_api_metrics_start = """
from deephaven import garbage_collect
def bench_api_metrics_start():
global bench_mem_usage, bench_compile_time, bench_gc_time, bench_gc_count
garbage_collect()
bench_compile_time = bench_api_compile_time()
bench_gc_time, bench_gc_count = bench_api_gc_info()
bench_mem_usage = bench_api_mem_usage()
""";

/**
* Get difference from <code>bench_api_metrics_start values and add as collected metrics
*/
static String bench_api_metrics_end = """
def bench_api_metrics_end():
bench_api_metrics_add('operation','compile.time',(bench_api_compile_time()-bench_compile_time)/1000.0)
gc_time, gc_count = bench_api_gc_info()
bench_api_metrics_add('operation','gc.time',(gc_time - bench_gc_time)/1000.0)
bench_api_metrics_add('operation','gc.count',gc_count - bench_gc_count)
garbage_collect()
bench_api_metrics_add('operation','heap.gain',bench_api_mem_usage() - bench_mem_usage)
""";

/**
* Add a metrics to the accumulated list of metrics that will be transformed by
* <code>bench_api_metrics_collect</code> into a Deephaven table for retrieval
* <p>
* ex. bench_api_metrics_add('docker', 'restart.secs', 5.1, 'restart duration in between tests')
* ex. bench_api_metrics_add('docker', 'restart.secs', '5.1', 'restart duration in between tests')
*
* @param category the metric category
* @param name the name of the metric
Expand Down Expand Up @@ -126,18 +193,26 @@ def bench_api_metrics_collect():
* @return a query containing function definitions
*/
static String getFunctions(String query) {
String functionDefs = "";
functionDefs += getFunction("bench_api_kafka_consume", bench_api_kafka_consume, query);
functionDefs += getFunction("bench_api_await_table_size", bench_api_await_table_size, query);
functionDefs += getFunction("bench_api_metrics_init", bench_api_metrics_init, query);
functionDefs += getFunction("bench_api_metrics_add", bench_api_metrics_add, query);
functionDefs += getFunction("bench_api_metrics_collect", bench_api_metrics_collect, query);
functionDefs += getFunction("bench_api_await_column_value_limit", bench_api_await_column_value_limit, query);
return functionDefs;
String defs = "";
defs += getFunc("bench_api_kafka_consume", bench_api_kafka_consume, query, "");
defs += getFunc("bench_api_await_table_size", bench_api_await_table_size, query, defs);
defs += getFunc("bench_api_metrics_init", bench_api_metrics_init, query, defs);
defs += getFunc("bench_api_metrics_start", bench_api_metrics_start, query, defs);
defs += getFunc("bench_api_metrics_end", bench_api_metrics_end, query, defs);
defs += getFunc("bench_api_mem_usage", bench_api_mem_usage, query, defs);
defs += getFunc("bench_api_compile_time", bench_api_compile_time, query, defs);
defs += getFunc("bench_api_gc_info", bench_api_gc_info, query, defs);
defs += getFunc("bench_api_get_bean", bench_api_get_bean, query, defs);
defs += getFunc("bench_api_metrics_add", bench_api_metrics_add, query, defs);
defs += getFunc("bench_api_metrics_collect", bench_api_metrics_collect, query, defs);
defs += getFunc("bench_api_await_column_value_limit", bench_api_await_column_value_limit, query, defs);
return defs;
}

static String getFunction(String functionName, String functionDef, String query) {
return query.contains(functionName) ? (functionDef + System.lineSeparator()) : "";
static String getFunc(String functionName, String functionDef, String query, String funcs) {
if (!query.contains(functionName) && !funcs.contains(functionName))
return "";
return functionDef + System.lineSeparator();
}

}

0 comments on commit 1306666

Please sign in to comment.