Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Collect GC, Compile Time and Heap Metrics #383

Merged
merged 3 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void groupedTable(String name, String... groups) {
mainTable = name;
generateTable(name, null, groups);
}

public void setServices(String... services) {
requiredServices.clear();
requiredServices.addAll(Arrays.asList(services));
Expand Down Expand Up @@ -225,16 +225,16 @@ Result runStaticTest(String name, String operation, String read, String... loadC
${mainTable} = ${readTable}
loaded_tbl_size = ${mainTable}.size
${setupQueries}

garbage_collect()

${preOpQueries}
bench_api_metrics_start()
print('${logOperationBegin}')

begin_time = time.perf_counter_ns()
result = ${operation}
end_time = time.perf_counter_ns()

print('${logOperationEnd}')
bench_api_metrics_end()
standard_metrics = bench_api_metrics_collect()

stats = new_table([
Expand All @@ -260,11 +260,9 @@ Result runIncTest(String name, String operation, String read, String... loadColu
if right:
right_filter = autotune(0, 1010000, 1.0, True)
right = right.where(right_filter)
print('Using Inc Right')

garbage_collect()

${preOpQueries}
bench_api_metrics_start()
print('${logOperationBegin}')
begin_time = time.perf_counter_ns()
result = ${operation}
Expand All @@ -280,6 +278,7 @@ Result runIncTest(String name, String operation, String read, String... loadColu

end_time = time.perf_counter_ns()
print('${logOperationEnd}')
bench_api_metrics_end()
standard_metrics = bench_api_metrics_collect()

stats = new_table([
Expand All @@ -296,7 +295,7 @@ Result runTest(String name, String query, String operation, String read, String.
initialize(testInst);
api.setName(name);
stopUnusedServices(requiredServices);

query = query.replace("${readTable}", read);
query = query.replace("${mainTable}", mainTable);
query = query.replace("${loadSupportTables}", loadSupportTables());
Expand Down Expand Up @@ -381,16 +380,16 @@ void restartServices() {
if (!controller.restartService())
return;
var metrics = new Metrics(Timer.now(), "test-runner", "setup.services");
metrics.set("restart", timer.duration().toMillis(), "standard");
metrics.set("restart", timer.duration().toMillis() / 1000.0, "standard");
api.metrics().add(metrics);
}

void stopUnusedServices(Set<String> keepServices) {
var timer = api.timer();
if (!controller.stopService(keepServices))
return;
var metrics = new Metrics(Timer.now(), "test-runner", "setup.services");
metrics.set("stop", timer.duration().toMillis(), "standard");
metrics.set("stop", timer.duration().toMillis() / 1000.0, "standard");
api.metrics().add(metrics);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,12 @@ void runCsvWriteTest(String testName, String... columnNames) {
*/
private void runReadTest(String testName, String readQuery, String... columnNames) {
var q = """
bench_api_metrics_init()
bench_api_metrics_start()
begin_time = time.perf_counter_ns()
source = ${readQuery}
end_time = time.perf_counter_ns()
bench_api_metrics_end()
standard_metrics = bench_api_metrics_collect()

stats = new_table([
Expand All @@ -194,9 +197,12 @@ private void runWriteTest(String testName, String writeQuery, String... columnNa
else:
source = empty_table(${rowCount}).update([${generators}])

bench_api_metrics_init()
bench_api_metrics_start()
begin_time = time.perf_counter_ns()
${writeQuery}
end_time = time.perf_counter_ns()
bench_api_metrics_end()
standard_metrics = bench_api_metrics_collect()

stats = new_table([
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class KafkaTestRunner {
* If a {@code docker.compose.file} is specified in supplied runtime properties, restart the corresponding docker
* images with Deephaven max heap set to the given gigabytes.
*
* @param deephavenHeapGigs the number of gigabytes to use for Deephave max heap
* @param deephavenHeapGigs the number of gigabytes to use for Deephaven max heap
*/
void restartWithHeap(int deephavenHeapGigs) {
String dockerComposeFile = api.property("docker.compose.file", "");
Expand Down Expand Up @@ -106,6 +106,7 @@ void runTest(String operation, String tableType) {
import deephaven.dtypes as dht

bench_api_metrics_init()
bench_api_metrics_start()
kc_spec = ${kafkaConsumerSpec}

begin_time = time.perf_counter_ns()
Expand All @@ -121,6 +122,7 @@ void runTest(String operation, String tableType) {
${awaitTableLoad}

end_time = time.perf_counter_ns()
bench_api_metrics_end()
standard_metrics = bench_api_metrics_collect()

stats = new_table([
Expand Down
107 changes: 91 additions & 16 deletions src/main/java/io/deephaven/benchmark/api/Snippets.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
/* Copyright (c) 2022-2023 Deephaven Data Labs and Patent Pending */
/* Copyright (c) 2022-2024 Deephaven Data Labs and Patent Pending */
package io.deephaven.benchmark.api;

/**
* Contains snippets of query code that can be called inside a query
* Contains snippets of python functions that can be called inside a query executed on the Deephaven Engine
*/
class Snippets {
/**
Expand Down Expand Up @@ -74,7 +74,8 @@ with exclusive_lock(table):
""";

/**
* Initialize the container for storing benchmark metrics
* Initialize the container for storing benchmark metrics. Define functions for getting some MX Bean data for gc,
* jit and heap
* <p>
* ex. bench_api_metrics_init()
*/
Expand All @@ -85,10 +86,76 @@ def bench_api_metrics_init():
""";

/**
* Captures the value of the first column in a table every Deephaven ticking interval and does not allow advancement
* in the current query logic until that value is reached
* Get the MX bean for the given getter factory method that works from
* <code>java.lang.management.ManagementFactory</code>
*/
static String bench_api_get_bean = """
import jpy
def bench_api_get_bean(bean_getter):
return getattr(jpy.get_type('java.lang.management.ManagementFactory'),bean_getter)()
""";

/**
* Get the current JVM heap usage in bytes
*/
static String bench_api_mem_usage = """
def bench_api_mem_usage():
return bench_api_get_bean('getMemoryMXBean').getHeapMemoryUsage().getUsed()
""";

/**
* Get the accumulated compile time
*/
static String bench_api_compile_time = """
def bench_api_compile_time():
return bench_api_get_bean('getCompilationMXBean').getTotalCompilationTime()
""";

/**
* Get the accumulated total time spent in GC and GC count
*/
static String bench_api_gc_info = """
def bench_api_gc_info():
total = 0.0; count = 0
beans = bench_api_get_bean('getGarbageCollectorMXBeans')
for i in range(0, beans.size()):
b = beans.get(i)
total = total + b.getCollectionTime()
count = count + b.getCollectionCount()
return total, count
""";

/**
* Set heap usage, compile time, GC time and GC Count to global variables
*/
static String bench_api_metrics_start = """
from deephaven import garbage_collect
def bench_api_metrics_start():
global bench_mem_usage, bench_compile_time, bench_gc_time, bench_gc_count
garbage_collect()
bench_compile_time = bench_api_compile_time()
bench_gc_time, bench_gc_count = bench_api_gc_info()
bench_mem_usage = bench_api_mem_usage()
""";

/**
* Get difference from <code>bench_api_metrics_start values and add as collected metrics
*/
static String bench_api_metrics_end = """
def bench_api_metrics_end():
bench_api_metrics_add('operation','compile.time',(bench_api_compile_time()-bench_compile_time)/1000.0)
gc_time, gc_count = bench_api_gc_info()
bench_api_metrics_add('operation','gc.time',(gc_time - bench_gc_time)/1000.0)
bench_api_metrics_add('operation','gc.count',gc_count - bench_gc_count)
garbage_collect()
bench_api_metrics_add('operation','heap.gain',bench_api_mem_usage() - bench_mem_usage)
""";

/**
* Add a metrics to the accumulated list of metrics that will be transformed by
* <code>bench_api_metrics_collect</code> into a Deephaven table for retrieval
* <p>
* ex. bench_api_metrics_add('docker', 'restart.secs', 5.1, 'restart duration in between tests')
* ex. bench_api_metrics_add('docker', 'restart.secs', '5.1', 'restart duration in between tests')
*
* @param category the metric category
* @param name the name of the metric
Expand Down Expand Up @@ -126,18 +193,26 @@ def bench_api_metrics_collect():
* @return a query containing function definitions
*/
static String getFunctions(String query) {
String functionDefs = "";
functionDefs += getFunction("bench_api_kafka_consume", bench_api_kafka_consume, query);
functionDefs += getFunction("bench_api_await_table_size", bench_api_await_table_size, query);
functionDefs += getFunction("bench_api_metrics_init", bench_api_metrics_init, query);
functionDefs += getFunction("bench_api_metrics_add", bench_api_metrics_add, query);
functionDefs += getFunction("bench_api_metrics_collect", bench_api_metrics_collect, query);
functionDefs += getFunction("bench_api_await_column_value_limit", bench_api_await_column_value_limit, query);
return functionDefs;
String defs = "";
defs += getFunc("bench_api_kafka_consume", bench_api_kafka_consume, query, "");
defs += getFunc("bench_api_await_table_size", bench_api_await_table_size, query, defs);
defs += getFunc("bench_api_metrics_init", bench_api_metrics_init, query, defs);
defs += getFunc("bench_api_metrics_start", bench_api_metrics_start, query, defs);
defs += getFunc("bench_api_metrics_end", bench_api_metrics_end, query, defs);
defs += getFunc("bench_api_mem_usage", bench_api_mem_usage, query, defs);
defs += getFunc("bench_api_compile_time", bench_api_compile_time, query, defs);
defs += getFunc("bench_api_gc_info", bench_api_gc_info, query, defs);
defs += getFunc("bench_api_get_bean", bench_api_get_bean, query, defs);
defs += getFunc("bench_api_metrics_add", bench_api_metrics_add, query, defs);
defs += getFunc("bench_api_metrics_collect", bench_api_metrics_collect, query, defs);
defs += getFunc("bench_api_await_column_value_limit", bench_api_await_column_value_limit, query, defs);
return defs;
}

static String getFunction(String functionName, String functionDef, String query) {
return query.contains(functionName) ? (functionDef + System.lineSeparator()) : "";
static String getFunc(String functionName, String functionDef, String query, String funcs) {
if (!query.contains(functionName) && !funcs.contains(functionName))
return "";
return functionDef + System.lineSeparator();
}

}