From 74ef1537be1cd7370316845b7f2bdb6c45c3e1f1 Mon Sep 17 00:00:00 2001 From: Stan Brubaker <120737309+stanbrub@users.noreply.github.com> Date: Mon, 12 Aug 2024 12:12:28 -0600 Subject: [PATCH] Column Iteration Benchmarks (#328) --- .../compare-scale-benchmark.properties | 2 +- .../compare/iterate/RowIteratorTest.java | 110 ++++++++++++++++++ .../standard/formula/RowIteratorTest.java | 42 +++++++ .../benchmark/run/profile/default.properties | 2 +- .../benchmark/run/profile/queries/compare.py | 33 ++++++ 5 files changed, 187 insertions(+), 2 deletions(-) create mode 100644 src/it/java/io/deephaven/benchmark/tests/compare/iterate/RowIteratorTest.java create mode 100644 src/it/java/io/deephaven/benchmark/tests/standard/formula/RowIteratorTest.java create mode 100644 src/main/resources/io/deephaven/benchmark/run/profile/queries/compare.py diff --git a/.github/resources/compare-scale-benchmark.properties b/.github/resources/compare-scale-benchmark.properties index c9a6dd4f..f9662768 100644 --- a/.github/resources/compare-scale-benchmark.properties +++ b/.github/resources/compare-scale-benchmark.properties @@ -24,7 +24,7 @@ default.data.distribution=random generator.pause.per.row=0 millis # Compression used for generating and storing records (SNAPPY, ZSTD, LZ4, LZO, GZIP, NONE) -record.compression=LZO +record.compression=SNAPPY # Row count to scale tests (Tests can override but typically do not) scale.row.count=70000000 diff --git a/src/it/java/io/deephaven/benchmark/tests/compare/iterate/RowIteratorTest.java b/src/it/java/io/deephaven/benchmark/tests/compare/iterate/RowIteratorTest.java new file mode 100644 index 00000000..bb6eba2c --- /dev/null +++ b/src/it/java/io/deephaven/benchmark/tests/compare/iterate/RowIteratorTest.java @@ -0,0 +1,110 @@ +/* Copyright (c) 2022-2024 Deephaven Data Labs and Patent Pending */ +package io.deephaven.benchmark.tests.compare.iterate; + +import org.junit.jupiter.api.*; +import org.junit.jupiter.api.MethodOrderer.OrderAnnotation; +import io.deephaven.benchmark.tests.compare.CompareTestRunner; + +/** + * Product comparison tests for iterating and summing table columns. Tests read the same parquet data. To avoid an + * unfair advantage where some products may partition or group data during the read, parquet read time is included in + * the benchmark results. + *

+ * Each test produces a table result containing one row with one column that is the total of the result of the sum of + * two columns for each row. ex. sum((r1c1 + r1c2)..(rNc1 + rNc2)). This is achieved without creating an extra column to + * hold the column sums. + *

+ * Data generation only happens in the first test, the Deephaven test. Tests can be run individually, but only after the + * desired data has been generated. + */ +@TestMethodOrder(OrderAnnotation.class) +public class RowIteratorTest { + final CompareTestRunner runner = new CompareTestRunner(this); + + @Test + @Order(1) + public void deephavenRowIterator() { + runner.initDeephaven(2, "source", null, "int250", "int640"); + var setup = "from deephaven.parquet import read"; + var op = """ + source = read('/data/source.parquet').select() + result = new_table([ + long_col('total', [sum(row.int250 + row.int640 for row in source.iter_tuple())]) + ]) + """; + var msize = "source.size"; + var rsize = "result.size"; + runner.test("Deephaven Row Iterator", setup, op, msize, rsize); + } + + @Test + @Order(2) + public void pyarrowRowIterator() { + runner.initPython("pyarrow"); + var setup = """ + import pyarrow as pa + import pyarrow.dataset as ds + import pyarrow.compute as pc + + def iterdicts(table, cols=[]): + for batch in table.to_batches(1024): + d = batch.to_pydict() + int250 = d['int250'] + int640 = d['int640'] + for i in range(len(int250)): + row = {'int250':int250[i],'int640':int640[i]} + yield row + """; + var op = """ + source = ds.dataset('/data/source.parquet', format="parquet").to_table() + rsum = sum(row['int250'] + row['int640'] for row in iterdicts(source)) + result = pa.Table.from_pydict({'total':[rsum]}) + """; + var msize = "source.num_rows"; + var rsize = "result.num_rows"; + runner.test("PyArrow Row Iterator", setup, op, msize, rsize); + } + + @Test + @Order(3) + public void pandasRowIteratior() { + runner.initPython("fastparquet", "pandas"); + var setup = "import pandas as pd"; + var op = """ + source = pd.read_parquet('/data/source.parquet') + rsum = sum(row.int250 + row.int640 for row in source.itertuples()) + result = pd.DataFrame([[rsum]], columns=['total']) + """; + var msize = "len(source)"; + var rsize = "len(result)"; + runner.test("Pandas Row Iterator", setup, op, msize, rsize); + } + + @Test + @Order(4) + public void duckdbRowIterator() { + runner.initPython("duckdb"); + var setup = """ + import duckdb as db + + def iterdicts(table): + while batch := table.fetchmany(1024): + for row in batch: + r = {'int250':row[0],'int640':row[1]} + yield r + """; + var op = """ + source = db.sql("SELECT * FROM '/data/source.parquet'") + + db.sql("CREATE TABLE results(total INT)") + rsum = sum(row['int250'] + row['int640'] for row in iterdicts(source)) + db.sql("INSERT INTO results VALUES(" + str(rsum) + ")") + sourceLen = db.sql("SELECT count(*) FROM source").fetchone()[0] + resultLen = db.sql("SELECT count(*) FROM results").fetchone()[0] + """; + var msize = "sourceLen"; + var rsize = "resultLen"; + runner.test("DuckDb Row Iterator", setup, op, msize, rsize); + } + +} diff --git a/src/it/java/io/deephaven/benchmark/tests/standard/formula/RowIteratorTest.java b/src/it/java/io/deephaven/benchmark/tests/standard/formula/RowIteratorTest.java new file mode 100644 index 00000000..ff9b54ea --- /dev/null +++ b/src/it/java/io/deephaven/benchmark/tests/standard/formula/RowIteratorTest.java @@ -0,0 +1,42 @@ +/* Copyright (c) 2022-2024 Deephaven Data Labs and Patent Pending */ +package io.deephaven.benchmark.tests.standard.formula; + +import org.junit.jupiter.api.*; +import io.deephaven.benchmark.tests.standard.StandardTestRunner; + +/** + * Standard tests for iterating through tables to access column values directly. These benchmarks iterate through the + * same columns and do the same sums. + */ +public class RowIteratorTest { + final StandardTestRunner runner = new StandardTestRunner(this); + + void setup(int rowFactor) { + runner.setRowFactor(rowFactor); + runner.tables("source"); + runner.setScaleFactors(1, 0); + } + + @Test + void iterDict2Cols() { + setup(2); + var q = """ + new_table([ + double_col('total', [sum(row['num1'] + row['num2'] for row in source.iter_dict())]) + ]) + """; + runner.test("Row-IterDict- Sum 2 Double Cols", 1, q, "num1", "num2"); + } + + @Test + void iterTuple2Cols() { + setup(4); + var q = """ + new_table([ + double_col('total', [sum(row.num1 + row.num2 for row in source.iter_tuple())]) + ]) + """; + runner.test("Row-IterTuple- Sum 2 Double Cols", 1, q, "num1", "num2"); + } + +} diff --git a/src/main/resources/io/deephaven/benchmark/run/profile/default.properties b/src/main/resources/io/deephaven/benchmark/run/profile/default.properties index 9f1d382e..bc4cab82 100644 --- a/src/main/resources/io/deephaven/benchmark/run/profile/default.properties +++ b/src/main/resources/io/deephaven/benchmark/run/profile/default.properties @@ -24,7 +24,7 @@ default.data.distribution=random generator.pause.per.row=0 millis # Compression used for generating and storing records (ZSTD, LZ4, LZO, GZIP, SNAPPY, NONE) -record.compression=LZO +record.compression=SNAPPY # Row count to scale tests (Tests can override but typically do not) scale.row.count=100000 diff --git a/src/main/resources/io/deephaven/benchmark/run/profile/queries/compare.py b/src/main/resources/io/deephaven/benchmark/run/profile/queries/compare.py new file mode 100644 index 00000000..8c5d32c1 --- /dev/null +++ b/src/main/resources/io/deephaven/benchmark/run/profile/queries/compare.py @@ -0,0 +1,33 @@ +# Copyright (c) 2022-2024 Deephaven Data Labs and Patent Pending +# +# Supporting Deephaven queries to use the benchmark_snippet to investigate product comparisons. +# - Generate a table that shows the comparisons between products for each benchmark +# Requirements: Deephaven 0.23.0 or greater + +from urllib.request import urlopen; import os + +benchmark_set_arg = 'stanbrub/full-set-140M' + +root = 'file:///data' if os.path.exists('/data/deephaven-benchmark') else 'https://storage.googleapis.com' +with urlopen(root + '/deephaven-benchmark/benchmark_tables.dh.py') as r: + benchmark_storage_uri_arg = root + '/deephaven-benchmark' + benchmark_category_arg ='adhoc' + benchmark_actor_filter_arg = os.path.dirname(benchmark_set_arg) + benchmark_set_filter_arg = os.path.basename(benchmark_set_arg) + exec(r.read().decode(), globals(), locals()) + + +product_compare = bench_results_sets.view([ + 'Product=benchmark_name.replaceAll(`[ ].*$`,``)', 'Benchmark=benchmark_name.replaceAll(`^[^ ]+[ ]`,``)', + 'Rate=op_rate' +]).sort(['Benchmark','Product']) + +from deephaven import numpy as dhnp +products = dhnp.to_numpy(product_compare.select_distinct(['Product'])) +products = [str(prod[0]) for prod in products] + +product_compare = product_result.group_by(['Benchmark']).view(['Benchmark'] + [ + products[i] + '=Rate[' + str(i) + ']' for i in range(len(products)) +]) + +bench_results = bench_metrics = bench_platforms = bench_results_sets = bench_results_change = None