diff --git a/src/it/java/io/deephaven/benchmark/tests/compare/CompareTestRunner.java b/src/it/java/io/deephaven/benchmark/tests/compare/CompareTestRunner.java index 6c0ad3d2..764e3182 100644 --- a/src/it/java/io/deephaven/benchmark/tests/compare/CompareTestRunner.java +++ b/src/it/java/io/deephaven/benchmark/tests/compare/CompareTestRunner.java @@ -27,7 +27,8 @@ */ public class CompareTestRunner { final Object testInst; - final List pipPackages = new ArrayList<>(); + final Set requiredPackages = new LinkedHashSet<>(); + final Map downloadFiles = new LinkedHashMap<>(); private Bench api = null; public CompareTestRunner(Object testInst) { @@ -42,6 +43,10 @@ public CompareTestRunner(Object testInst) { public Bench api() { return api; } + + public void addDownloadFiles(String sourceUri, String destDir) { + downloadFiles.put(sourceUri, destDir); + } /** * Initialize the test as a purely Deephaven test. This should only be called once at the beginning of the test @@ -66,7 +71,8 @@ public void initDeephaven(int rowCountFactor, String leftTable, String rightTabl public void initPython(String... packages) { restartDocker(1); initialize(testInst); - pipPackages.addAll(Arrays.asList(packages)); + requiredPackages.addAll(Arrays.asList(packages)); + requiredPackages.add("install-jdk"); } /** @@ -97,8 +103,8 @@ public void test(String name, String setup, String operation, String mainSizeGet public void test(String name, long expectedRowCount, String setup, String operation, String mainSizeGetter, String resultSizeGetter) { Result result; - if (pipPackages.size() > 0) { - installPipPackages(); + if (requiredPackages.size() > 0) { + installRequiredPackages(); result = runPythonTest(name, setup, operation, mainSizeGetter, resultSizeGetter); } else { result = runDeephavenTest(name, setup, operation, mainSizeGetter, resultSizeGetter); @@ -106,13 +112,16 @@ public void test(String name, long expectedRowCount, String setup, String operat var rcount = result.resultRowCount(); var ecount = (expectedRowCount < 1) ? Long.MAX_VALUE : expectedRowCount; assertTrue(rcount > 0 && rcount <= ecount, "Wrong result row count: " + rcount); + System.out.println("Result Count: " + rcount); } /** - * Tell Deephaven to install python packages using pip in its environment. Note: This assumes that after these - * packages are installed, Deephaven will only be used as an agent to run command line python code + * Tell Deephaven to install required python or java packages in its environment. Note: This assumes that after + * these packages are installed, Deephaven will only be used as an agent to run command line python code */ - void installPipPackages() { + void installRequiredPackages() { + var pipPackages = requiredPackages.stream().filter(p -> !isJdkPackage(p)).toList(); + var query = """ text = '''PACKAGES='${pipPackages}' VENV_PATH=~/deephaven-benchmark-venv @@ -123,12 +132,57 @@ void installPipPackages() { ./bin/pip install ${PKG} done ''' - run_script('bash', 'setup-benchmark-workspace.sh', text) + save_file('setup-benchmark-workspace.sh', text) + run_script('bash', 'setup-benchmark-workspace.sh') """; query = query.replace("${pipPackages}", String.join(" ", pipPackages)); api.query(query).execute(); + + requiredPackages.forEach(p -> installJavaPackage(p)); + downloadFiles.forEach((s,d) -> placeDownloadFile(s, d)); + } + + boolean isJdkPackage(String javaDescr) { + return javaDescr.matches("jdk-[0-9]+"); + } + + void installJavaPackage(String javaDescr) { + if (!isJdkPackage(javaDescr)) + return; + var version = javaDescr.replaceAll("jdk-([0-9]+)", "$1"); + var query = """ + text = ''' + import os, jdk + if not os.path.exists('./jdk-${version}'): + jdk.install('11', vendor='Temurin', path='./') + os.system('mv jdk*/ jdk-${version}') + os.system('echo JAVA_HOME=$PWD/jdk-${version} > ENV_VARS.sh') + ''' + save_file('install-jdk.py', text) + run_script('./bin/python', 'install-jdk.py') + """; + query = query.replace("${version}", String.join(" ", version)); + api.query(query).execute(); + } + + void placeDownloadFile(String sourceUri, String destDir) { + var query = """ + text = ''' + import requests, os + + dest = '${destDir}/' + os.path.basename('${sourceUri}') + r = requests.get('${sourceUri}', allow_redirects=True) + open(dest, 'wb').write(r.content) + ''' + save_file('install-file.py', text) + run_script('./bin/python', 'install-file.py') + """; + query = query.replace("${sourceUri}", sourceUri); + query = query.replace("${destDir}", destDir); + api.query(query).execute(); } + /** * Run the test in Deephaven proper. Do not push to the command line. * @@ -143,7 +197,7 @@ Result runDeephavenTest(String name, String setup, String operation, String main var query = """ begin_time = time.perf_counter_ns() ${setupQueries} - result = ${operation} + ${operation} op_duration = time.perf_counter_ns() - begin_time stats = new_table([ @@ -166,10 +220,20 @@ Result runDeephavenTest(String name, String setup, String operation, String main */ Result runPythonTest(String name, String setup, String operation, String mainSizeGetter, String resultSizeGetter) { var query = """ + text = '''#!/usr/bin/env bash + touch ENV_VARS.sh + source ENV_VARS.sh + ./bin/python $1 + ''' + save_file('run-benchmark-test.sh', text) + """; + api.query(query).execute(); + + query = """ text = '''import time - begin_time = time.perf_counter_ns() ${setupQueries} - result = ${operation} + begin_time = time.perf_counter_ns() + ${operation} op_duration = time.perf_counter_ns() - begin_time main_size = ${mainSizeGetter} result_size = ${resultSizeGetter} @@ -177,7 +241,8 @@ Result runPythonTest(String name, String setup, String operation, String mainSiz print("-- Test Results --") print("{", "'duration':", op_duration, ", 'main_size':", main_size, ", 'result_size':", result_size, "}") ''' - result = run_script('./bin/python', 'benchmark-test.py', text) + save_file('benchmark-test.py', text) + result = run_script('./run-benchmark-test.sh', 'benchmark-test.py') result = eval(result.splitlines()[-1]) stats = new_table([ @@ -224,11 +289,16 @@ void initialize(Object testInst) { user_home = str(Path.home()) benchmark_home = user_home + '/deephaven-benchmark-venv' - - def run_script(runner, script_name, script_text): + + def save_file(file_name, file_text): os.makedirs(benchmark_home, exist_ok=True) - with open(benchmark_home + '/' + script_name, 'w') as f: - f.write(script_text) + file_path = benchmark_home + '/' + file_name + with open(file_path, 'w') as f: + f.write(file_text) + st = os.stat(file_path) + os.chmod(file_path, st.st_mode | stat.S_IEXEC) + + def run_script(runner, script_name): command_array = [runner, script_name] output=subprocess.check_output(command_array, cwd=benchmark_home).decode("utf-8") print(output) diff --git a/src/it/java/io/deephaven/benchmark/tests/compare/Setup.java b/src/it/java/io/deephaven/benchmark/tests/compare/Setup.java new file mode 100644 index 00000000..fc5483bb --- /dev/null +++ b/src/it/java/io/deephaven/benchmark/tests/compare/Setup.java @@ -0,0 +1,52 @@ +package io.deephaven.benchmark.tests.compare; + +public class Setup { + static public String flink(CompareTestRunner r) { + addDownloadJar(r, "flink", "flink-sql-parquet", "1.17.1"); + addDownloadJar(r, "flink", "flink-oss-fs-hadoop", "1.17.1"); + addDownloadJar(r, "hadoop", "hadoop-mapreduce-client-core", "2.10.2"); + + var q = """ + import time, os + import pandas as pd + from pyflink.common import Row + from pyflink.table import (EnvironmentSettings, TableEnvironment) + from pyflink.table.expressions import lit, col + from pyflink.table.udf import udtf + + t_env = TableEnvironment.create(EnvironmentSettings.in_batch_mode()) + t_env.get_config().set("parallelism.default", "1") + t_env.get_config().set("taskmanager.memory.process.size", "24g") + t_env.get_config().set("jobmanager.memory.process.size", "24g") + t_env.get_config().set("python.fn-execution.arrow.batch.size", "10000000") + t_env.get_config().set("python.fn-execution.bundle.size", "10000000") + t_env.get_config().set("python.state.cache-size", "10000000") + t_env.get_config().set("python.map-state.iterate-response-batch-size", "10000000") + t_env.get_config().set("python.map-state.read-cache-size", "10000000") + t_env.get_config().set("python.map-state.write-cache-size", "10000000") + t_env.get_config().set("python.metric.enabled", "false") + t_env.get_config().set("python.operator-chaining.enabled", "true") + + os.system('rm -rf /data/results.csv') + + def count_rows(table_name): + sname = table_name + '_stats' + stats_dir = '/data/' + sname + '.csv' + os.system('rm -rf ' + stats_dir) + t_env.execute_sql("CREATE TABLE " + sname + "(row_count BIGINT) WITH ('connector'='filesystem','path'='" + stats_dir + "','format'='csv')") + t_env.execute_sql("INSERT INTO " + sname + " SELECT count(*) AS row_count FROM " + table_name).wait() + count = 0 + for r in t_env.from_path(sname).execute().collect(): + count = r[0] + return count + """; + return q; + } + + static public void addDownloadJar(CompareTestRunner r, String prod, String artifact, String version) { + var destDir = "lib/python3.10/site-packages/pyflink/lib"; + var apacheUri = "https://repo1.maven.org/maven2/org/apache/"; + var uri = apacheUri + prod + '/' + artifact + '/' + version + '/' + artifact + '-' + version + ".jar"; + r.addDownloadFiles(uri, destDir); + } +} diff --git a/src/it/java/io/deephaven/benchmark/tests/compare/agg/AverageByTest.java b/src/it/java/io/deephaven/benchmark/tests/compare/agg/AverageByTest.java index c5c99d97..49a3bbeb 100644 --- a/src/it/java/io/deephaven/benchmark/tests/compare/agg/AverageByTest.java +++ b/src/it/java/io/deephaven/benchmark/tests/compare/agg/AverageByTest.java @@ -4,13 +4,17 @@ import org.junit.jupiter.api.*; import org.junit.jupiter.api.MethodOrderer.OrderAnnotation; import io.deephaven.benchmark.tests.compare.CompareTestRunner; +import io.deephaven.benchmark.tests.compare.Setup; /** * Product comparison tests for the average by group operation. Tests read the same parquet data. To avoid an unfair * advantage where some products may partition or group data during the read, parquet read time is included in the * benchmark results. *

- * Each test calculates two new average columns and groups by a string and an integer + * Each test calculates two new average columns and groups by a string and an integer. + *

+ * Data generation only happens in the first tests, the Deephaven test. Tests can be run individually, but only after + * the desired data has been generated. */ @TestMethodOrder(OrderAnnotation.class) public class AverageByTest { @@ -23,12 +27,14 @@ public void deephavenAverageBy() { var setup = """ from deephaven import agg from deephaven.parquet import read - source = read('/data/source.parquet').select() aggs = [ agg.avg('Avg1=int250'), agg.avg('Avg2=int640') ] """; - var op = "source.agg_by(aggs, by=['str250', 'int640'])"; + var op = """ + source = read('/data/source.parquet').select() + result = source.agg_by(aggs, by=['str250', 'int640']) + """; var msize = "source.size"; var rsize = "result.size"; runner.test("Deephaven Average By", setup, op, msize, rsize); @@ -38,11 +44,11 @@ public void deephavenAverageBy() { @Order(2) public void pyarrowAverageBy() { runner.initPython("pyarrow"); - var setup = """ - import pyarrow.dataset as ds + var setup = "import pyarrow.dataset as ds"; + var op = """ source = ds.dataset('/data/source.parquet', format="parquet").to_table() + result = source.group_by(['str250', 'int640']).aggregate([('int250','mean'), ('int640','mean')]) """; - var op = "source.group_by(['str250', 'int640']).aggregate([('int250','mean'), ('int640','mean')])"; var msize = "source.num_rows"; var rsize = "result.num_rows"; runner.test("PyArrow Average By", setup, op, msize, rsize); @@ -52,12 +58,10 @@ public void pyarrowAverageBy() { @Order(3) public void pandasAverageBy() { runner.initPython("fastparquet", "pandas"); - var setup = """ - import pandas as pd - source = pd.read_parquet('/data/source.parquet') - """; + var setup = "import pandas as pd"; var op = """ - source.groupby(['str250', 'int640']).agg( + source = pd.read_parquet('/data/source.parquet') + result = source.groupby(['str250', 'int640']).agg( Avg1=pd.NamedAgg('int250', "mean"), Avg2=pd.NamedAgg('int640', 'mean') ) """; @@ -66,4 +70,21 @@ public void pandasAverageBy() { runner.test("Pandas Average By", setup, op, msize, rsize); } + @Test + @Order(4) + @Disabled + public void flinkAverageBy() { + runner.initPython("apache-flink", "jdk-11"); + var op = """ + t_env.execute_sql("CREATE TABLE source(int250 INT,int640 INT,str250 STRING) WITH ('connector'='filesystem','path'='/data/source.parquet','format'='parquet')") + t_env.execute_sql("CREATE TABLE results(str250 STRING,int640 INT,Avg1 INT,Avg2 INT) WITH ('connector'='filesystem','path'='/data/results.csv','format'='csv')") + #t_env.execute_sql("CREATE TABLE results(str250 STRING,int640 INT,Avg1 INT,Avg2 INT) WITH ('connector'='blackhole')") + t_env.execute_sql("INSERT INTO results SELECT str250,int640,AVG(int250) AS Avg1,AVG(int640) AS Avg2 FROM source GROUP BY str250, int640").wait() + """; + + var msize = "count_rows('source')"; + var rsize = "count_rows('results')"; // Change to 1 for using blackhole connector + runner.test("Flink Average By", Setup.flink(runner), op, msize, rsize); + } + } diff --git a/src/it/java/io/deephaven/benchmark/tests/compare/distinct/DistinctTest.java b/src/it/java/io/deephaven/benchmark/tests/compare/distinct/DistinctTest.java index cd4191e6..448a2ba5 100644 --- a/src/it/java/io/deephaven/benchmark/tests/compare/distinct/DistinctTest.java +++ b/src/it/java/io/deephaven/benchmark/tests/compare/distinct/DistinctTest.java @@ -4,13 +4,17 @@ import org.junit.jupiter.api.*; import org.junit.jupiter.api.MethodOrderer.OrderAnnotation; import io.deephaven.benchmark.tests.compare.CompareTestRunner; +import io.deephaven.benchmark.tests.compare.Setup; /** * Product comparison tests for the distinct (or select distinct) group operation. Tests read the same parquet data. To * avoid an unfair advantage where some products may partition or group data during the read, parquet read time is * included in the benchmark results. *

- * Each test produces a table result that contains rows unique according to a string and an integer + * Each test produces a table result that contains rows unique according to a string and an integer. + *

+ * Data generation only happens in the first tests, the Deephaven test. Tests can be run individually, but only after + * the desired data has been generated. */ @TestMethodOrder(OrderAnnotation.class) public class DistinctTest { @@ -20,11 +24,11 @@ public class DistinctTest { @Order(1) public void deephavenDistinct() { runner.initDeephaven(2, "source", null, "int640", "str250"); - var setup = """ - from deephaven.parquet import read + var setup = "from deephaven.parquet import read"; + var op = """ source = read('/data/source.parquet').select() + result = source.select_distinct(formulas=['str250', 'int640']) """; - var op = "source.select_distinct(formulas=['str250', 'int640'])"; var msize = "source.size"; var rsize = "result.size"; runner.test("Deephaven Distinct", setup, op, msize, rsize); @@ -34,11 +38,11 @@ public void deephavenDistinct() { @Order(2) public void pyarrowDistinct() { runner.initPython("pyarrow"); - var setup = """ - import pyarrow.dataset as ds + var setup = "import pyarrow.dataset as ds"; + var op = """ source = ds.dataset('/data/source.parquet', format="parquet").to_table() + result = source.group_by(['str250', 'int640']).aggregate([]) """; - var op = "source.group_by(['str250', 'int640']).aggregate([])"; var msize = "source.num_rows"; var rsize = "result.num_rows"; runner.test("PyArrow Distinct", setup, op, msize, rsize); @@ -48,14 +52,30 @@ public void pyarrowDistinct() { @Order(3) public void pandasDistinct() { runner.initPython("fastparquet", "pandas"); - var setup = """ - import pandas as pd + var setup = "import pandas as pd"; + var op = """ source = pd.read_parquet('/data/source.parquet') + result = source.drop_duplicates(subset=['str250','int640'], keep='last') """; - var op = "source.drop_duplicates(subset=['str250','int640'], keep='last')"; var msize = "len(source)"; var rsize = "len(result)"; runner.test("Pandas Distinct", setup, op, msize, rsize); } + @Test + @Order(4) + @Disabled + public void flinkDistinct() { + runner.initPython("apache-flink", "jdk-11"); + var op = """ + source = pd.read_parquet('/data/source.parquet') + loaded_size = len(source) + source = t_env.from_pandas(source) + result = source.select(col('str250'), col('int640')).distinct().to_pandas() + """; + var msize = "loaded_size"; + var rsize = "len(result)"; + runner.test("Flink Distinct", Setup.flink(runner), op, msize, rsize); + } + } diff --git a/src/it/java/io/deephaven/benchmark/tests/compare/filter/FilterTest.java b/src/it/java/io/deephaven/benchmark/tests/compare/filter/FilterTest.java index 8a55bdd3..86f4f3f8 100644 --- a/src/it/java/io/deephaven/benchmark/tests/compare/filter/FilterTest.java +++ b/src/it/java/io/deephaven/benchmark/tests/compare/filter/FilterTest.java @@ -4,6 +4,7 @@ import org.junit.jupiter.api.*; import org.junit.jupiter.api.MethodOrderer.OrderAnnotation; import io.deephaven.benchmark.tests.compare.CompareTestRunner; +import io.deephaven.benchmark.tests.compare.Setup; /** * Product comparison tests for filter (where) operations. Tests read the same parquet data. To avoid an unfair @@ -11,7 +12,10 @@ * benchmark results. *

* Each test produces a table result filtered by three criteria; value is an exact string, value > an integer, value < - * an integer + * an integer. + *

+ * Data generation only happens in the first tests, the Deephaven test. Tests can be run individually, but only after + * the desired data has been generated. */ @TestMethodOrder(OrderAnnotation.class) public class FilterTest { @@ -21,12 +25,10 @@ public class FilterTest { @Order(1) public void deephavenFilter() { runner.initDeephaven(2, "source", null, "str250", "int640"); - var setup = """ - from deephaven.parquet import read - source = read('/data/source.parquet').select() - """; + var setup = "from deephaven.parquet import read"; var op = """ - source.where(["str250 = '250'", "int640 > 100", "int640 < 540"]); + source = read('/data/source.parquet').select() + result = source.where(["str250 = '250'", "int640 > 100", "int640 < 540"]) """; var msize = "source.size"; var rsize = "result.size"; @@ -40,10 +42,12 @@ public void pyarrowFilter() { var setup = """ import pyarrow.dataset as ds import pyarrow.compute as pc + """; + var op = """ source = ds.dataset('/data/source.parquet', format="parquet").to_table() expr = (pc.field('str250') == '250') & (pc.field('int640') > 100) & (pc.field('int640') < 540) + result = source.filter(expr) """; - var op = "source.filter(expr)"; var msize = "source.num_rows"; var rsize = "result.num_rows"; runner.test("PyArrow Filter", setup, op, msize, rsize); @@ -53,16 +57,30 @@ public void pyarrowFilter() { @Order(3) public void pandasFilter() { runner.initPython("fastparquet", "pandas"); - var setup = """ - import pandas as pd - source = pd.read_parquet('/data/source.parquet') - """; + var setup = "import pandas as pd"; var op = """ - source.query("str250 == '250' & int640 > 100 & int640 < 540") + source = pd.read_parquet('/data/source.parquet') + result = source.query("str250 == '250' & int640 > 100 & int640 < 540") """; var msize = "len(source)"; var rsize = "len(result)"; runner.test("Pandas Filter", setup, op, msize, rsize); } + @Test + @Order(4) + @Disabled + public void flinkFilter() { + runner.initPython("apache-flink", "jdk-11"); + var op = """ + source = pd.read_parquet('/data/source.parquet') + loaded_size = len(source) + source = t_env.from_pandas(source) + result = source.filter((col('str250') == '250') & (col('int640') > 100) & (col('int640') < 540)).to_pandas() + """; + var msize = "loaded_size"; + var rsize = "len(result)"; + runner.test("Flink Filter", Setup.flink(runner), op, msize, rsize); + } + } diff --git a/src/it/java/io/deephaven/benchmark/tests/compare/join/InnerJoinTest.java b/src/it/java/io/deephaven/benchmark/tests/compare/join/InnerJoinTest.java index d31bff7c..edc5f8c4 100644 --- a/src/it/java/io/deephaven/benchmark/tests/compare/join/InnerJoinTest.java +++ b/src/it/java/io/deephaven/benchmark/tests/compare/join/InnerJoinTest.java @@ -4,13 +4,17 @@ import static org.junit.jupiter.api.MethodOrderer.*; import org.junit.jupiter.api.*; import io.deephaven.benchmark.tests.compare.CompareTestRunner; +import io.deephaven.benchmark.tests.compare.Setup; /** - * Product comparison tests for inner join operations. Tests read the same parquet data. To avoid an unfair - * advantage where some products may partition or group data during the read, parquet read time is included in the - * benchmark results. + * Product comparison tests for inner join operations. Tests read the same parquet data. To avoid an unfair advantage + * where some products may partition or group data during the read, parquet read time is included in the benchmark + * results. *

- * Each test produces a table that is the result of two tables intersected by a string and an integer + * Each test produces a table that is the result of two tables intersected by a string and an integer. + *

+ * Data generation only happens in the first tests, the Deephaven test. Tests can be run individually, but only after + * the desired data has been generated. */ @TestMethodOrder(OrderAnnotation.class) public class InnerJoinTest { @@ -18,14 +22,14 @@ public class InnerJoinTest { @Test @Order(1) - public void deephavenJoin() { + public void deephavenInnerJoin() { runner.initDeephaven(2, "source", "right", "int1M", "str250", "r_int1M", "r_str250"); - var setup = """ - from deephaven.parquet import read + var setup = "from deephaven.parquet import read"; + var op = """ source = read('/data/source.parquet').select() right = read('/data/right.parquet').select() + result = source.join(right, on=['str250 = r_str250', 'int1M = r_int1M']) """; - var op = "source.join(right, on=['str250 = r_str250', 'int1M = r_int1M'])"; var msize = "source.size"; var rsize = "result.size"; runner.test("Deephaven Inner Join", setup, op, msize, rsize); @@ -33,32 +37,50 @@ public void deephavenJoin() { @Test @Order(2) - public void pyarrowJoin() { + public void pyarrowInnerJoin() { runner.initPython("pyarrow"); - var setup = """ - import pyarrow.dataset as ds + var setup = "import pyarrow.dataset as ds"; + var op = """ source = ds.dataset('/data/source.parquet', format="parquet").to_table() right = ds.dataset('/data/right.parquet', format="parquet").to_table() + result = source.join(right, keys=['str250','int1M'], right_keys=['r_str250','r_int1M'], join_type='inner') """; - var op = "source.join(right, keys=['str250','int1M'], right_keys=['r_str250','r_int1M'], join_type='inner')"; var msize = "source.num_rows"; var rsize = "result.num_rows"; runner.test("PyArrow Inner Join", setup, op, msize, rsize); } - + @Test @Order(3) - public void pandasJoin() { + public void pandasInnerJoin() { runner.initPython("fastparquet", "pandas"); - var setup = """ - import pandas as pd + var setup = "import pandas as pd"; + var op = """ source = pd.read_parquet('/data/source.parquet') right = pd.read_parquet('/data/right.parquet') + result = source.merge(right, left_on=['str250','int1M'], right_on=['r_str250','r_int1M'], how='inner') """; - var op = "source.merge(right, left_on=['str250','int1M'], right_on=['r_str250','r_int1M'], how='inner')"; var msize = "len(source)"; var rsize = "len(result)"; runner.test("Pandas Inner Join", setup, op, msize, rsize); } + @Test + @Order(4) + @Disabled + public void flinkInnerJoin() { + runner.initPython("apache-flink", "jdk-11"); + var op = """ + source = pd.read_parquet('/data/source.parquet') + loaded_size = len(source) + source = t_env.from_pandas(source) + right = pd.read_parquet('/data/right.parquet') + right = t_env.from_pandas(right) + result = source.join(right, (col('str250') == col('r_str250')) & (col('int1M') == col('r_int1M'))).to_pandas() + """; + var msize = "loaded_size"; + var rsize = "len(result)"; + runner.test("Flink Inner Join", Setup.flink(runner), op, msize, rsize); + } + } diff --git a/src/it/java/io/deephaven/benchmark/tests/compare/sort/SortTest.java b/src/it/java/io/deephaven/benchmark/tests/compare/sort/SortTest.java index 0fca705c..80d8b68c 100644 --- a/src/it/java/io/deephaven/benchmark/tests/compare/sort/SortTest.java +++ b/src/it/java/io/deephaven/benchmark/tests/compare/sort/SortTest.java @@ -4,13 +4,16 @@ import org.junit.jupiter.api.*; import org.junit.jupiter.api.MethodOrderer.OrderAnnotation; import io.deephaven.benchmark.tests.compare.CompareTestRunner; +import io.deephaven.benchmark.tests.compare.Setup; /** - * Product comparison tests for sort operations. Tests read the same parquet data. To avoid an unfair - * advantage where some products may partition or group data during the read, parquet read time is included in the - * benchmark results. + * Product comparison tests for sort operations. Tests read the same parquet data. To avoid an unfair advantage where + * some products may partition or group data during the read, parquet read time is included in the benchmark results. *

- * Each test sorts a table by a string and an integer + * Each test sorts a table by a string and an integer. + *

+ * Data generation only happens in the first tests, the Deephaven test. Tests can be run individually, but only after + * the desired data has been generated. */ @TestMethodOrder(OrderAnnotation.class) public class SortTest { @@ -20,11 +23,11 @@ public class SortTest { @Order(1) public void deephavenSort() { runner.initDeephaven(1, "source", null, "int640", "str250"); - var setup = """ - from deephaven.parquet import read + var setup = "from deephaven.parquet import read"; + var op = """ source = read('/data/source.parquet').select() + result = source.sort(order_by=['str250', 'int640']) """; - var op = "source.sort(order_by=['str250', 'int640'])"; var msize = "source.size"; var rsize = "result.size"; runner.test("Deephaven Sort", setup, op, msize, rsize); @@ -34,28 +37,44 @@ public void deephavenSort() { @Order(2) public void pyarrowSort() { runner.initPython("pyarrow"); - var setup = """ - import pyarrow.dataset as ds + var setup = "import pyarrow.dataset as ds"; + var op = """ source = ds.dataset('/data/source.parquet', format="parquet").to_table() + result = source.sort_by([('str250','ascending'), ('int640','ascending')]) """; - var op = "source.sort_by([('str250','ascending'), ('int640','ascending')])"; var msize = "source.num_rows"; var rsize = "result.num_rows"; runner.test("PyArrow Sort", setup, op, msize, rsize); } - + @Test @Order(3) public void pandasSort() { runner.initPython("fastparquet", "pandas"); - var setup = """ - import pandas as pd + var setup = "import pandas as pd"; + var op = """ source = pd.read_parquet('/data/source.parquet') + result = source.sort_values(by=['str250','int640']) """; - var op = "source.sort_values(by=['str250','int640'])"; var msize = "len(source)"; var rsize = "len(result)"; runner.test("Pandas Sort", setup, op, msize, rsize); } + @Test + @Order(4) + @Disabled + public void flinkSort() { + runner.initPython("apache-flink", "jdk-11"); + var op = """ + source = pd.read_parquet('/data/source.parquet') + loaded_size = len(source) + source = t_env.from_pandas(source) + result = source.order_by(col('str250'), col('int640')).to_pandas() + """; + var msize = "loaded_size"; + var rsize = "len(result)"; + runner.test("Flink Sort", Setup.flink(runner), op, msize, rsize); + } + }