Skip to content

Commit

Permalink
PyFlink comparison benchmarks (#163)
Browse files Browse the repository at this point in the history
  • Loading branch information
stanbrub authored Sep 12, 2023
1 parent c0ee150 commit b4ff081
Show file tree
Hide file tree
Showing 7 changed files with 302 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
*/
public class CompareTestRunner {
final Object testInst;
final List<String> pipPackages = new ArrayList<>();
final Set<String> requiredPackages = new LinkedHashSet<>();
final Map<String,String> downloadFiles = new LinkedHashMap<>();
private Bench api = null;

public CompareTestRunner(Object testInst) {
Expand All @@ -42,6 +43,10 @@ public CompareTestRunner(Object testInst) {
public Bench api() {
return api;
}

public void addDownloadFiles(String sourceUri, String destDir) {
downloadFiles.put(sourceUri, destDir);
}

/**
* Initialize the test as a purely Deephaven test. This should only be called once at the beginning of the test
Expand All @@ -66,7 +71,8 @@ public void initDeephaven(int rowCountFactor, String leftTable, String rightTabl
public void initPython(String... packages) {
restartDocker(1);
initialize(testInst);
pipPackages.addAll(Arrays.asList(packages));
requiredPackages.addAll(Arrays.asList(packages));
requiredPackages.add("install-jdk");
}

/**
Expand Down Expand Up @@ -97,22 +103,25 @@ public void test(String name, String setup, String operation, String mainSizeGet
public void test(String name, long expectedRowCount, String setup, String operation, String mainSizeGetter,
String resultSizeGetter) {
Result result;
if (pipPackages.size() > 0) {
installPipPackages();
if (requiredPackages.size() > 0) {
installRequiredPackages();
result = runPythonTest(name, setup, operation, mainSizeGetter, resultSizeGetter);
} else {
result = runDeephavenTest(name, setup, operation, mainSizeGetter, resultSizeGetter);
}
var rcount = result.resultRowCount();
var ecount = (expectedRowCount < 1) ? Long.MAX_VALUE : expectedRowCount;
assertTrue(rcount > 0 && rcount <= ecount, "Wrong result row count: " + rcount);
System.out.println("Result Count: " + rcount);
}

/**
* Tell Deephaven to install python packages using pip in its environment. Note: This assumes that after these
* packages are installed, Deephaven will only be used as an agent to run command line python code
* Tell Deephaven to install required python or java packages in its environment. Note: This assumes that after
* these packages are installed, Deephaven will only be used as an agent to run command line python code
*/
void installPipPackages() {
void installRequiredPackages() {
var pipPackages = requiredPackages.stream().filter(p -> !isJdkPackage(p)).toList();

var query = """
text = '''PACKAGES='${pipPackages}'
VENV_PATH=~/deephaven-benchmark-venv
Expand All @@ -123,12 +132,57 @@ void installPipPackages() {
./bin/pip install ${PKG}
done
'''
run_script('bash', 'setup-benchmark-workspace.sh', text)
save_file('setup-benchmark-workspace.sh', text)
run_script('bash', 'setup-benchmark-workspace.sh')
""";
query = query.replace("${pipPackages}", String.join(" ", pipPackages));
api.query(query).execute();

requiredPackages.forEach(p -> installJavaPackage(p));
downloadFiles.forEach((s,d) -> placeDownloadFile(s, d));
}

boolean isJdkPackage(String javaDescr) {
return javaDescr.matches("jdk-[0-9]+");
}

void installJavaPackage(String javaDescr) {
if (!isJdkPackage(javaDescr))
return;
var version = javaDescr.replaceAll("jdk-([0-9]+)", "$1");
var query = """
text = '''
import os, jdk
if not os.path.exists('./jdk-${version}'):
jdk.install('11', vendor='Temurin', path='./')
os.system('mv jdk*/ jdk-${version}')
os.system('echo JAVA_HOME=$PWD/jdk-${version} > ENV_VARS.sh')
'''
save_file('install-jdk.py', text)
run_script('./bin/python', 'install-jdk.py')
""";
query = query.replace("${version}", String.join(" ", version));
api.query(query).execute();
}

void placeDownloadFile(String sourceUri, String destDir) {
var query = """
text = '''
import requests, os
dest = '${destDir}/' + os.path.basename('${sourceUri}')
r = requests.get('${sourceUri}', allow_redirects=True)
open(dest, 'wb').write(r.content)
'''
save_file('install-file.py', text)
run_script('./bin/python', 'install-file.py')
""";
query = query.replace("${sourceUri}", sourceUri);
query = query.replace("${destDir}", destDir);
api.query(query).execute();
}


/**
* Run the test in Deephaven proper. Do not push to the command line.
*
Expand All @@ -143,7 +197,7 @@ Result runDeephavenTest(String name, String setup, String operation, String main
var query = """
begin_time = time.perf_counter_ns()
${setupQueries}
result = ${operation}
${operation}
op_duration = time.perf_counter_ns() - begin_time
stats = new_table([
Expand All @@ -166,18 +220,29 @@ Result runDeephavenTest(String name, String setup, String operation, String main
*/
Result runPythonTest(String name, String setup, String operation, String mainSizeGetter, String resultSizeGetter) {
var query = """
text = '''#!/usr/bin/env bash
touch ENV_VARS.sh
source ENV_VARS.sh
./bin/python $1
'''
save_file('run-benchmark-test.sh', text)
""";
api.query(query).execute();

query = """
text = '''import time
begin_time = time.perf_counter_ns()
${setupQueries}
result = ${operation}
begin_time = time.perf_counter_ns()
${operation}
op_duration = time.perf_counter_ns() - begin_time
main_size = ${mainSizeGetter}
result_size = ${resultSizeGetter}
print("-- Test Results --")
print("{", "'duration':", op_duration, ", 'main_size':", main_size, ", 'result_size':", result_size, "}")
'''
result = run_script('./bin/python', 'benchmark-test.py', text)
save_file('benchmark-test.py', text)
result = run_script('./run-benchmark-test.sh', 'benchmark-test.py')
result = eval(result.splitlines()[-1])
stats = new_table([
Expand Down Expand Up @@ -224,11 +289,16 @@ void initialize(Object testInst) {
user_home = str(Path.home())
benchmark_home = user_home + '/deephaven-benchmark-venv'
def run_script(runner, script_name, script_text):
def save_file(file_name, file_text):
os.makedirs(benchmark_home, exist_ok=True)
with open(benchmark_home + '/' + script_name, 'w') as f:
f.write(script_text)
file_path = benchmark_home + '/' + file_name
with open(file_path, 'w') as f:
f.write(file_text)
st = os.stat(file_path)
os.chmod(file_path, st.st_mode | stat.S_IEXEC)
def run_script(runner, script_name):
command_array = [runner, script_name]
output=subprocess.check_output(command_array, cwd=benchmark_home).decode("utf-8")
print(output)
Expand Down
52 changes: 52 additions & 0 deletions src/it/java/io/deephaven/benchmark/tests/compare/Setup.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package io.deephaven.benchmark.tests.compare;

public class Setup {
static public String flink(CompareTestRunner r) {
addDownloadJar(r, "flink", "flink-sql-parquet", "1.17.1");
addDownloadJar(r, "flink", "flink-oss-fs-hadoop", "1.17.1");
addDownloadJar(r, "hadoop", "hadoop-mapreduce-client-core", "2.10.2");

var q = """
import time, os
import pandas as pd
from pyflink.common import Row
from pyflink.table import (EnvironmentSettings, TableEnvironment)
from pyflink.table.expressions import lit, col
from pyflink.table.udf import udtf
t_env = TableEnvironment.create(EnvironmentSettings.in_batch_mode())
t_env.get_config().set("parallelism.default", "1")
t_env.get_config().set("taskmanager.memory.process.size", "24g")
t_env.get_config().set("jobmanager.memory.process.size", "24g")
t_env.get_config().set("python.fn-execution.arrow.batch.size", "10000000")
t_env.get_config().set("python.fn-execution.bundle.size", "10000000")
t_env.get_config().set("python.state.cache-size", "10000000")
t_env.get_config().set("python.map-state.iterate-response-batch-size", "10000000")
t_env.get_config().set("python.map-state.read-cache-size", "10000000")
t_env.get_config().set("python.map-state.write-cache-size", "10000000")
t_env.get_config().set("python.metric.enabled", "false")
t_env.get_config().set("python.operator-chaining.enabled", "true")
os.system('rm -rf /data/results.csv')
def count_rows(table_name):
sname = table_name + '_stats'
stats_dir = '/data/' + sname + '.csv'
os.system('rm -rf ' + stats_dir)
t_env.execute_sql("CREATE TABLE " + sname + "(row_count BIGINT) WITH ('connector'='filesystem','path'='" + stats_dir + "','format'='csv')")
t_env.execute_sql("INSERT INTO " + sname + " SELECT count(*) AS row_count FROM " + table_name).wait()
count = 0
for r in t_env.from_path(sname).execute().collect():
count = r[0]
return count
""";
return q;
}

static public void addDownloadJar(CompareTestRunner r, String prod, String artifact, String version) {
var destDir = "lib/python3.10/site-packages/pyflink/lib";
var apacheUri = "https://repo1.maven.org/maven2/org/apache/";
var uri = apacheUri + prod + '/' + artifact + '/' + version + '/' + artifact + '-' + version + ".jar";
r.addDownloadFiles(uri, destDir);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.MethodOrderer.OrderAnnotation;
import io.deephaven.benchmark.tests.compare.CompareTestRunner;
import io.deephaven.benchmark.tests.compare.Setup;

/**
* Product comparison tests for the average by group operation. Tests read the same parquet data. To avoid an unfair
* advantage where some products may partition or group data during the read, parquet read time is included in the
* benchmark results.
* <p/>
* Each test calculates two new average columns and groups by a string and an integer
* Each test calculates two new average columns and groups by a string and an integer.
* <p/>
* Data generation only happens in the first tests, the Deephaven test. Tests can be run individually, but only after
* the desired data has been generated.
*/
@TestMethodOrder(OrderAnnotation.class)
public class AverageByTest {
Expand All @@ -23,12 +27,14 @@ public void deephavenAverageBy() {
var setup = """
from deephaven import agg
from deephaven.parquet import read
source = read('/data/source.parquet').select()
aggs = [
agg.avg('Avg1=int250'), agg.avg('Avg2=int640')
]
""";
var op = "source.agg_by(aggs, by=['str250', 'int640'])";
var op = """
source = read('/data/source.parquet').select()
result = source.agg_by(aggs, by=['str250', 'int640'])
""";
var msize = "source.size";
var rsize = "result.size";
runner.test("Deephaven Average By", setup, op, msize, rsize);
Expand All @@ -38,11 +44,11 @@ public void deephavenAverageBy() {
@Order(2)
public void pyarrowAverageBy() {
runner.initPython("pyarrow");
var setup = """
import pyarrow.dataset as ds
var setup = "import pyarrow.dataset as ds";
var op = """
source = ds.dataset('/data/source.parquet', format="parquet").to_table()
result = source.group_by(['str250', 'int640']).aggregate([('int250','mean'), ('int640','mean')])
""";
var op = "source.group_by(['str250', 'int640']).aggregate([('int250','mean'), ('int640','mean')])";
var msize = "source.num_rows";
var rsize = "result.num_rows";
runner.test("PyArrow Average By", setup, op, msize, rsize);
Expand All @@ -52,12 +58,10 @@ public void pyarrowAverageBy() {
@Order(3)
public void pandasAverageBy() {
runner.initPython("fastparquet", "pandas");
var setup = """
import pandas as pd
source = pd.read_parquet('/data/source.parquet')
""";
var setup = "import pandas as pd";
var op = """
source.groupby(['str250', 'int640']).agg(
source = pd.read_parquet('/data/source.parquet')
result = source.groupby(['str250', 'int640']).agg(
Avg1=pd.NamedAgg('int250', "mean"), Avg2=pd.NamedAgg('int640', 'mean')
)
""";
Expand All @@ -66,4 +70,21 @@ public void pandasAverageBy() {
runner.test("Pandas Average By", setup, op, msize, rsize);
}

@Test
@Order(4)
@Disabled
public void flinkAverageBy() {
runner.initPython("apache-flink", "jdk-11");
var op = """
t_env.execute_sql("CREATE TABLE source(int250 INT,int640 INT,str250 STRING) WITH ('connector'='filesystem','path'='/data/source.parquet','format'='parquet')")
t_env.execute_sql("CREATE TABLE results(str250 STRING,int640 INT,Avg1 INT,Avg2 INT) WITH ('connector'='filesystem','path'='/data/results.csv','format'='csv')")
#t_env.execute_sql("CREATE TABLE results(str250 STRING,int640 INT,Avg1 INT,Avg2 INT) WITH ('connector'='blackhole')")
t_env.execute_sql("INSERT INTO results SELECT str250,int640,AVG(int250) AS Avg1,AVG(int640) AS Avg2 FROM source GROUP BY str250, int640").wait()
""";

var msize = "count_rows('source')";
var rsize = "count_rows('results')"; // Change to 1 for using blackhole connector
runner.test("Flink Average By", Setup.flink(runner), op, msize, rsize);
}

}
Loading

0 comments on commit b4ff081

Please sign in to comment.