Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet test improvements #175

Merged
merged 10 commits into from
Oct 13, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ Result runIncTest(String name, String operation, String read, String... loadColu
stats = new_table([
double_col("elapsed_nanos", [end_time - begin_time]),
long_col("processed_row_count", [loaded_tbl_size]),
long_col("result_row_count", [result.size]),
long_col("result_row_count", [result.size])
])
""";
return runTest(name + " -Inc", incQuery, operation, read, loadColumns);
Expand Down Expand Up @@ -245,7 +245,8 @@ Result runTest(String name, String query, String operation, String read, String.
var metrics = new Metrics(Timer.now(), "test-runner", "setup", "test");
metrics.set("static_scale_factor", staticFactor);
metrics.set("inc_scale_factor", incFactor);
metrics.set("row count factor", rowCountFactor);
metrics.set("row_count_factor", rowCountFactor);
api.metrics().add(metrics);
}).execute();
api.result().test("deephaven-engine", result.get().elapsedTime(), result.get().loadedRowCount());
return result.get();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package io.deephaven.benchmark.tests.standard.parquet;

import org.junit.jupiter.api.*;
import org.junit.jupiter.api.MethodOrderer.OrderAnnotation;

/**
* Standard tests for writing/reading multi-column data with different codec/compression. To save time, the parquet
* generated by the "write" tests is used by the "read" tests
*/

// Need to add array and vector tests. Look at adding iterations
stanbrub marked this conversation as resolved.
Show resolved Hide resolved

@TestMethodOrder(OrderAnnotation.class)
public class ParquetCodecTest {
final ParquetTestRunner runner = new ParquetTestRunner(this);
final String[] usedColumns = {"str10K", "long10K", "int10K", "short10K", "bigDec10K", "array1K", "vector1K"};
stanbrub marked this conversation as resolved.
Show resolved Hide resolved

@BeforeEach
public void setup() {
runner.setScaleFactors(5, 1);
}

@Test
@Order(1)
public void writeMultiColSnappy() {
runner.runWriteTest("ParquetWrite- Snappy Multi Col -Static", "SNAPPY", usedColumns);
}

@Test
@Order(2)
public void readMultiColSnappy() {
runner.runReadTest("ParquetRead- Snappy Multi Col -Static");
}

@Test
@Order(3)
public void writeMultiColZstd() {
runner.runWriteTest("ParquetWrite- Zstd Multi Col -Static", "ZSTD", usedColumns);
}

@Test
@Order(4)
public void readMultiColZstd() {
runner.runReadTest("ParquetRead- Zstd Multi Col -Static");
}

@Test
@Order(5)
public void writeMultiColLzo() {
runner.runWriteTest("ParquetWrite- Lzo Multi Col -Static", "LZO", usedColumns);
}

@Test
@Order(6)
public void readMultiColLzo() {
runner.runReadTest("ParquetRead- Lzo Multi Col -Static");
}

@Test
@Order(7)
public void writeMultiColLz4Raw() {
runner.runWriteTest("ParquetWrite- Lz4Raw Multi Col -Static", "LZ4_RAW", usedColumns);
}

@Test
@Order(8)
public void readMultiColLz4Raw() {
runner.runReadTest("ParquetRead- Lz4Raw Multi Col -Static");
}

@Test
@Order(9)
public void writeMultiColGzip() {
runner.runWriteTest("ParquetWrite- Gzip Multi Col -Static", "GZIP", usedColumns);
}

@Test
@Order(10)
public void readMultiColGzip() {
runner.runReadTest("ParquetRead- Gzip Multi Col -Static");
}

@Test
@Order(11)
public void writeMultiColNone() {
runner.runWriteTest("ParquetWrite- No Codec Multi Col -Static", "NONE", usedColumns);
}

@Test
@Order(12)
public void readMultiColNone() {
runner.runReadTest("ParquetRead- No Codec Multi Col -Static");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package io.deephaven.benchmark.tests.standard.parquet;

import org.junit.jupiter.api.*;
import org.junit.jupiter.api.MethodOrderer.OrderAnnotation;

/**
* Standard tests for writing single column parquet for different column types.
*/
public class ParquetSingleColTest {
final ParquetTestRunner runner = new ParquetTestRunner(this);

@Test
public void writeOneStringCol() {
runner.setScaleFactors(5, 50);
runner.runWriteTest("ParquetWrite- 1 String Col -Static", "SNAPPY", "str10K");
}

@Test
public void writeOneBigDecimalCol() {
runner.setScaleFactors(5, 12);
runner.runWriteTest("ParquetWrite- 1 Big Decimal Col -Static", "SNAPPY", "bigDec10K");
}

@Test
public void writeOneLongCol() {
runner.setScaleFactors(5, 30);
runner.runWriteTest("ParquetWrite- 1 Long Col -Static", "SNAPPY", "long10K");
}

@Test
public void writeOneIntCol() {
runner.setScaleFactors(5, 60);
stanbrub marked this conversation as resolved.
Show resolved Hide resolved
runner.runWriteTest("ParquetWrite- 1 Int Col -Static", "SNAPPY", "int10K");
}

@Test
public void writeOneShortCol() {
runner.setScaleFactors(5, 70);
runner.runWriteTest("ParquetWrite- 1 Short Col -Static", "SNAPPY", "short10K");
}

@Test
public void writeOneArrayCol() {
runner.setScaleFactors(5, 2);
runner.runWriteTest("ParquetWrite- 1 Int Array Col -Static", "SNAPPY", "array1K");
}

@Test
public void writeOneVectorCol() {
runner.setScaleFactors(5, 2);
runner.runWriteTest("ParquetWrite- 1 Int Vector Col -Static", "SNAPPY", "vector1K");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
package io.deephaven.benchmark.tests.standard.parquet;

import static org.junit.jupiter.api.Assertions.assertEquals;
import java.time.Duration;
import java.util.Arrays;
import java.util.stream.Collectors;
import io.deephaven.benchmark.api.Bench;
import io.deephaven.benchmark.metric.Metrics;
import io.deephaven.benchmark.util.Exec;
import io.deephaven.benchmark.util.Timer;

/**
* Test reading and writing parquet files with various data types and compression codecs.
*/
class ParquetTestRunner {
final Object testInst;
final Bench api;
private int rowCountFactor = 1;
private int scaleFactor = 1;
private long scaleRowCount;

ParquetTestRunner(Object testInst) {
this.testInst = testInst;
this.api = initialize(testInst);
this.scaleRowCount = api.propertyAsIntegral("scale.row.count", "100000");
}

/**
* Set a multiplier for the generated rows and a multiplier for simulating more rows with {@code merge}
*
* @param rowCountFactor the multiplier for the scale.row.count property
* @param scaleFactor the multiplier for how many merges to do on the generated table to simulate more rows
*/
void setScaleFactors(int rowCountFactor, int scaleFactor) {
this.rowCountFactor = rowCountFactor;
this.scaleRowCount = (long) (api.propertyAsIntegral("scale.row.count", "100000") * rowCountFactor);
this.scaleFactor = scaleFactor;
}

/**
* Read a benchmark that measures parquet read performance. This tests always runs after a corresponding write test.
*
* @param testName name that will appear in the results as the benchmark name
*/
void runReadTest(String testName) {
var q = """
bench_api_metrics_snapshot()
begin_time = time.perf_counter_ns()
source = read('/data/source.ptr.parquet').select()
end_time = time.perf_counter_ns()
bench_api_metrics_snapshot()
standard_metrics = bench_api_metrics_collect()

stats = new_table([
double_col("elapsed_nanos", [end_time - begin_time]),
long_col("processed_row_count", [source.size]),
long_col("result_row_count", [source.size])
])
""";
runTest(testName, q);
}

/**
* Run a benchmark the measures parquet write performance.
*
* @param testName the benchmark name to record with the measurement
* @param codec a compression codec
* @param columnNames the names of the pre-defined columns to generate
*/
void runWriteTest(String testName, String codec, String... columnNames) {
var q = """
source = merge([empty_table(${rowCount}).update([
${generators}
])] * ${scaleFactor})

bench_api_metrics_snapshot()
begin_time = time.perf_counter_ns()
write(
source, '/data/source.ptr.parquet', compression_codec_name='${codec}',
max_dictionary_keys=2000000, max_dictionary_size=20000000, target_page_size=2000000
stanbrub marked this conversation as resolved.
Show resolved Hide resolved
)
end_time = time.perf_counter_ns()
bench_api_metrics_snapshot()
standard_metrics = bench_api_metrics_collect()

stats = new_table([
double_col("elapsed_nanos", [end_time - begin_time]),
long_col("processed_row_count", [source.size]),
long_col("result_row_count", [source.size])
])
""";
q = q.replace("${rowCount}", "" + scaleRowCount);
q = q.replace("${scaleFactor}", "" + scaleFactor);
q = q.replace("${codec}", codec.equalsIgnoreCase("none") ? "UNCOMPRESSED" : codec);
q = q.replace("${generators}", getGenerators(columnNames));
runTest(testName, q);
}

/**
* Run the test through barrage java client, collect the results, and report them.
*
* @param testName the benchmark name to record with the results
* @param query the test query to run
*/
void runTest(String testName, String query) {
try {
api.setName(testName);
api.query(query).fetchAfter("stats", table -> {
stanbrub marked this conversation as resolved.
Show resolved Hide resolved
long rowCount = table.getSum("processed_row_count").longValue();
long elapsedNanos = table.getSum("elapsed_nanos").longValue();
long resultRowCount = table.getSum("result_row_count").longValue();
assertEquals(scaleRowCount * scaleFactor, resultRowCount);
api.result().test("deephaven-engine", Duration.ofNanos(elapsedNanos), rowCount);
}).fetchAfter("standard_metrics", table -> {
api.metrics().add(table);
var metrics = new Metrics(Timer.now(), "test-runner", "setup", "test");
metrics.set("static_scale_factor", scaleFactor);
metrics.set("row_count_factor", rowCountFactor);
api.metrics().add(metrics);
}).execute();
} finally {
api.close();
}
}

/**
* Get the lines of code required to generate the data for pre-defined column names
*
* @param columnNames the column names to generate code for
* @return the lines of code needed to generate column ndata
*/
String getGenerators(String... columnNames) {
return Arrays.stream(columnNames).map(c -> "'" + c + "=" + getGenerator(c) + "'")
.collect(Collectors.joining(",\n")) + '\n';
}

/**
* Get the code needed for generating data for the given pre-defined column name.
*
* @param columnName the column name to generate data for
* @return the data generation code
*/
String getGenerator(String columnName) {
String g = "";
switch (columnName) {
case "str10K":
g = "(ii % 10 == 0) ? null : (`` + (ii % 10000))";
break;
case "long10K":
g = "(ii % 10 == 0) ? null : (ii % 10000)";
break;
case "int10K":
g = "(ii % 10 == 0) ? null : ((int)(ii % 10000))";
break;
case "short10K":
g = "(ii % 10 == 0) ? null : ((short)(ii % 10000))";
break;
case "bigDec10K":
g = "(ii % 10 == 0) ? null : java.math.BigDecimal.valueOf(ii % 10000)";
break;
case "array1K":
g = "(ii % 10 == 0) ? null : new int[]{i,i+1,i+2,i+3,i+4,i+5}";
break;
case "vector1K":
g = "(ii % 10 == 0) ? null : new io.deephaven.vector.IntVectorDirect(i,i+1,i+2,i+3,i+4,i+5)";
stanbrub marked this conversation as resolved.
Show resolved Hide resolved
break;
default:
throw new RuntimeException("Undefined column: " + columnName);
}
return g;
}

/**
* Initialize the test client and its properties. Restart Docker if it is local to the test client and the
* {@code docker.compose.file} set.
*
* @param testInst the test instance this runner is associated with.
* @return a new Bench API instance.
*/
Bench initialize(Object testInst) {
var query = """
import time
from deephaven import empty_table, garbage_collect, new_table, merge
from deephaven.column import long_col, double_col
from deephaven.parquet import read, write
""";

Bench api = Bench.create(testInst);
restartDocker(api);
api.query(query).execute();
return api;
}

/**
* Restart Docker if it is local to this test runner and the {@code docker.compose.file} set.
*
* @param api the Bench API for this test runner.
*/
void restartDocker(Bench api) {
var timer = api.timer();
if (!Exec.restartDocker(api.property("docker.compose.file", ""), api.property("deephaven.addr", "")))
return;
var metrics = new Metrics(Timer.now(), "test-runner", "setup", "docker");
metrics.set("restart", timer.duration().toMillis(), "standard");
api.metrics().add(metrics);
}

}
Loading