From 278cb85205558efa99d3814c8db2c3092f24cbf8 Mon Sep 17 00:00:00 2001 From: stanbrub Date: Tue, 12 Nov 2024 10:36:03 -0700 Subject: [PATCH 1/3] Added gc, compile time, and heap metrics --- .../tests/standard/StandardTestRunner.java | 23 ++-- .../io/deephaven/benchmark/api/Snippets.java | 105 +++++++++++++++--- 2 files changed, 101 insertions(+), 27 deletions(-) diff --git a/src/it/java/io/deephaven/benchmark/tests/standard/StandardTestRunner.java b/src/it/java/io/deephaven/benchmark/tests/standard/StandardTestRunner.java index 7ac925c..0ba3983 100644 --- a/src/it/java/io/deephaven/benchmark/tests/standard/StandardTestRunner.java +++ b/src/it/java/io/deephaven/benchmark/tests/standard/StandardTestRunner.java @@ -90,7 +90,7 @@ public void groupedTable(String name, String... groups) { mainTable = name; generateTable(name, null, groups); } - + public void setServices(String... services) { requiredServices.clear(); requiredServices.addAll(Arrays.asList(services)); @@ -225,16 +225,16 @@ Result runStaticTest(String name, String operation, String read, String... loadC ${mainTable} = ${readTable} loaded_tbl_size = ${mainTable}.size ${setupQueries} - - garbage_collect() - ${preOpQueries} + bench_api_metrics_start() print('${logOperationBegin}') - + begin_time = time.perf_counter_ns() result = ${operation} end_time = time.perf_counter_ns() + print('${logOperationEnd}') + bench_api_metrics_end() standard_metrics = bench_api_metrics_collect() stats = new_table([ @@ -260,11 +260,9 @@ Result runIncTest(String name, String operation, String read, String... loadColu if right: right_filter = autotune(0, 1010000, 1.0, True) right = right.where(right_filter) - print('Using Inc Right') - - garbage_collect() ${preOpQueries} + bench_api_metrics_start() print('${logOperationBegin}') begin_time = time.perf_counter_ns() result = ${operation} @@ -280,6 +278,7 @@ Result runIncTest(String name, String operation, String read, String... loadColu end_time = time.perf_counter_ns() print('${logOperationEnd}') + bench_api_metrics_end() standard_metrics = bench_api_metrics_collect() stats = new_table([ @@ -296,7 +295,7 @@ Result runTest(String name, String query, String operation, String read, String. initialize(testInst); api.setName(name); stopUnusedServices(requiredServices); - + query = query.replace("${readTable}", read); query = query.replace("${mainTable}", mainTable); query = query.replace("${loadSupportTables}", loadSupportTables()); @@ -381,16 +380,16 @@ void restartServices() { if (!controller.restartService()) return; var metrics = new Metrics(Timer.now(), "test-runner", "setup.services"); - metrics.set("restart", timer.duration().toMillis(), "standard"); + metrics.set("restart", timer.duration().toMillis() / 1000.0, "standard"); api.metrics().add(metrics); } - + void stopUnusedServices(Set keepServices) { var timer = api.timer(); if (!controller.stopService(keepServices)) return; var metrics = new Metrics(Timer.now(), "test-runner", "setup.services"); - metrics.set("stop", timer.duration().toMillis(), "standard"); + metrics.set("stop", timer.duration().toMillis() / 1000.0, "standard"); api.metrics().add(metrics); } diff --git a/src/main/java/io/deephaven/benchmark/api/Snippets.java b/src/main/java/io/deephaven/benchmark/api/Snippets.java index f491db7..34cabaa 100644 --- a/src/main/java/io/deephaven/benchmark/api/Snippets.java +++ b/src/main/java/io/deephaven/benchmark/api/Snippets.java @@ -2,7 +2,7 @@ package io.deephaven.benchmark.api; /** - * Contains snippets of query code that can be called inside a query + * Contains snippets of python functions that can be called inside a query executed on the Deephaven Engine */ class Snippets { /** @@ -74,7 +74,8 @@ with exclusive_lock(table): """; /** - * Initialize the container for storing benchmark metrics + * Initialize the container for storing benchmark metrics. Define functions for getting some MX Bean data for gc, + * jit and heap *

* ex. bench_api_metrics_init() */ @@ -85,10 +86,76 @@ def bench_api_metrics_init(): """; /** - * Captures the value of the first column in a table every Deephaven ticking interval and does not allow advancement - * in the current query logic until that value is reached + * Get the MX bean for the given getter factory method that works from + * java.lang.management.ManagementFactory + */ + static String bench_api_get_bean = """ + import jpy + def bench_api_get_bean(bean_getter): + return getattr(jpy.get_type('java.lang.management.ManagementFactory'),bean_getter)() + """; + + /** + * Get the current JVM heap usage in bytes + */ + static String bench_api_mem_usage = """ + def bench_api_mem_usage(): + return bench_api_get_bean('getMemoryMXBean').getHeapMemoryUsage().getUsed() + """; + + /** + * Get the accumulated compile time + */ + static String bench_api_compile_time = """ + def bench_api_compile_time(): + return bench_api_get_bean('getCompilationMXBean').getTotalCompilationTime() + """; + + /** + * Get the accumulated total time spent in GC and GC count + */ + static String bench_api_gc_info = """ + def bench_api_gc_info(): + total = 0.0; count = 0 + beans = bench_api_get_bean('getGarbageCollectorMXBeans') + for i in range(0, beans.size()): + b = beans.get(i) + total = total + b.getCollectionTime() + count = count + b.getCollectionCount() + return total, count + """; + + /** + * Set heap usage, compile time, GC time and GC Count to global variables + */ + static String bench_api_metrics_start = """ + from deephaven import garbage_collect + def bench_api_metrics_start(): + global bench_mem_usage, bench_compile_time, bench_gc_time, bench_gc_count + garbage_collect() + bench_compile_time = bench_api_compile_time() + bench_gc_time, bench_gc_count = bench_api_gc_info() + bench_mem_usage = bench_api_mem_usage() + """; + + /** + * Get difference from bench_api_metrics_start values and add as collected metrics + */ + static String bench_api_metrics_end = """ + def bench_api_metrics_end(): + bench_api_metrics_add('operation','compile.time',(bench_api_compile_time()-bench_compile_time)/1000.0) + gc_time, gc_count = bench_api_gc_info() + bench_api_metrics_add('operation','gc.time',(gc_time - bench_gc_time)/1000.0) + bench_api_metrics_add('operation','gc.count',gc_count - bench_gc_count) + garbage_collect() + bench_api_metrics_add('operation','heap.gain',bench_api_mem_usage() - bench_mem_usage) + """; + + /** + * Add a metrics to the accumulated list of metrics that will be transformed by + * bench_api_metrics_collect into a Deephaven table for retrieval *

- * ex. bench_api_metrics_add('docker', 'restart.secs', 5.1, 'restart duration in between tests') + * ex. bench_api_metrics_add('docker', 'restart.secs', '5.1', 'restart duration in between tests') * * @param category the metric category * @param name the name of the metric @@ -126,18 +193,26 @@ def bench_api_metrics_collect(): * @return a query containing function definitions */ static String getFunctions(String query) { - String functionDefs = ""; - functionDefs += getFunction("bench_api_kafka_consume", bench_api_kafka_consume, query); - functionDefs += getFunction("bench_api_await_table_size", bench_api_await_table_size, query); - functionDefs += getFunction("bench_api_metrics_init", bench_api_metrics_init, query); - functionDefs += getFunction("bench_api_metrics_add", bench_api_metrics_add, query); - functionDefs += getFunction("bench_api_metrics_collect", bench_api_metrics_collect, query); - functionDefs += getFunction("bench_api_await_column_value_limit", bench_api_await_column_value_limit, query); - return functionDefs; + String defs = ""; + defs += getFunc("bench_api_kafka_consume", bench_api_kafka_consume, query, ""); + defs += getFunc("bench_api_await_table_size", bench_api_await_table_size, query, defs); + defs += getFunc("bench_api_metrics_init", bench_api_metrics_init, query, defs); + defs += getFunc("bench_api_metrics_start", bench_api_metrics_start, query, defs); + defs += getFunc("bench_api_metrics_end", bench_api_metrics_end, query, defs); + defs += getFunc("bench_api_mem_usage", bench_api_mem_usage, query, defs); + defs += getFunc("bench_api_compile_time", bench_api_compile_time, query, defs); + defs += getFunc("bench_api_gc_info", bench_api_gc_info, query, defs); + defs += getFunc("bench_api_get_bean", bench_api_get_bean, query, defs); + defs += getFunc("bench_api_metrics_add", bench_api_metrics_add, query, defs); + defs += getFunc("bench_api_metrics_collect", bench_api_metrics_collect, query, defs); + defs += getFunc("bench_api_await_column_value_limit", bench_api_await_column_value_limit, query, defs); + return defs; } - static String getFunction(String functionName, String functionDef, String query) { - return query.contains(functionName) ? (functionDef + System.lineSeparator()) : ""; + static String getFunc(String functionName, String functionDef, String query, String funcs) { + if (!query.contains(functionName) && !funcs.contains(functionName)) + return ""; + return functionDef + System.lineSeparator(); } } From 2b1ea4fa67573cdc6ab6260b3c48779ae9897943 Mon Sep 17 00:00:00 2001 From: stanbrub Date: Wed, 13 Nov 2024 11:27:54 -0700 Subject: [PATCH 2/3] Change copyright --- src/main/java/io/deephaven/benchmark/api/Snippets.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/deephaven/benchmark/api/Snippets.java b/src/main/java/io/deephaven/benchmark/api/Snippets.java index 34cabaa..67344ef 100644 --- a/src/main/java/io/deephaven/benchmark/api/Snippets.java +++ b/src/main/java/io/deephaven/benchmark/api/Snippets.java @@ -1,4 +1,4 @@ -/* Copyright (c) 2022-2023 Deephaven Data Labs and Patent Pending */ +/* Copyright (c) 2022-2024 Deephaven Data Labs and Patent Pending */ package io.deephaven.benchmark.api; /** From 144997cfb95948e6fa580f60d99e210f1a57b764 Mon Sep 17 00:00:00 2001 From: stanbrub Date: Wed, 13 Nov 2024 13:50:43 -0700 Subject: [PATCH 3/3] Add metrics collection to Kafka and File Tests --- .../benchmark/tests/standard/file/FileTestRunner.java | 6 ++++++ .../benchmark/tests/standard/kafka/KafkaTestRunner.java | 4 +++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/src/it/java/io/deephaven/benchmark/tests/standard/file/FileTestRunner.java b/src/it/java/io/deephaven/benchmark/tests/standard/file/FileTestRunner.java index 13b5b05..4e2315f 100644 --- a/src/it/java/io/deephaven/benchmark/tests/standard/file/FileTestRunner.java +++ b/src/it/java/io/deephaven/benchmark/tests/standard/file/FileTestRunner.java @@ -170,9 +170,12 @@ void runCsvWriteTest(String testName, String... columnNames) { */ private void runReadTest(String testName, String readQuery, String... columnNames) { var q = """ + bench_api_metrics_init() + bench_api_metrics_start() begin_time = time.perf_counter_ns() source = ${readQuery} end_time = time.perf_counter_ns() + bench_api_metrics_end() standard_metrics = bench_api_metrics_collect() stats = new_table([ @@ -194,9 +197,12 @@ private void runWriteTest(String testName, String writeQuery, String... columnNa else: source = empty_table(${rowCount}).update([${generators}]) + bench_api_metrics_init() + bench_api_metrics_start() begin_time = time.perf_counter_ns() ${writeQuery} end_time = time.perf_counter_ns() + bench_api_metrics_end() standard_metrics = bench_api_metrics_collect() stats = new_table([ diff --git a/src/it/java/io/deephaven/benchmark/tests/standard/kafka/KafkaTestRunner.java b/src/it/java/io/deephaven/benchmark/tests/standard/kafka/KafkaTestRunner.java index 650edae..d65014d 100644 --- a/src/it/java/io/deephaven/benchmark/tests/standard/kafka/KafkaTestRunner.java +++ b/src/it/java/io/deephaven/benchmark/tests/standard/kafka/KafkaTestRunner.java @@ -44,7 +44,7 @@ class KafkaTestRunner { * If a {@code docker.compose.file} is specified in supplied runtime properties, restart the corresponding docker * images with Deephaven max heap set to the given gigabytes. * - * @param deephavenHeapGigs the number of gigabytes to use for Deephave max heap + * @param deephavenHeapGigs the number of gigabytes to use for Deephaven max heap */ void restartWithHeap(int deephavenHeapGigs) { String dockerComposeFile = api.property("docker.compose.file", ""); @@ -106,6 +106,7 @@ void runTest(String operation, String tableType) { import deephaven.dtypes as dht bench_api_metrics_init() + bench_api_metrics_start() kc_spec = ${kafkaConsumerSpec} begin_time = time.perf_counter_ns() @@ -121,6 +122,7 @@ void runTest(String operation, String tableType) { ${awaitTableLoad} end_time = time.perf_counter_ns() + bench_api_metrics_end() standard_metrics = bench_api_metrics_collect() stats = new_table([