From 9c8434ae2bd06954e745cf2bdfdf7a800404a011 Mon Sep 17 00:00:00 2001 From: Strongest Number 9 <16169054+StrongestNumber9@users.noreply.github.com> Date: Thu, 19 Jan 2023 17:23:44 +0200 Subject: [PATCH] Initial public release (#1) * Initial public release --- .github/workflows/maven-publish.yml | 33 ++ .gitignore | 11 + LICENSE | 30 +- pom.xml | 181 +++++++++++ .../functions/dpf_02/BatchCollect.java | 246 ++++++++++++++ .../dpf_02/ConvertIPStringToInt.java | 75 +++++ .../functions/dpf_02/SortByClause.java | 127 ++++++++ src/test/java/BatchCollectTest.java | 300 ++++++++++++++++++ 8 files changed, 1001 insertions(+), 2 deletions(-) create mode 100644 .github/workflows/maven-publish.yml create mode 100644 .gitignore create mode 100644 pom.xml create mode 100644 src/main/java/com/teragrep/functions/dpf_02/BatchCollect.java create mode 100644 src/main/java/com/teragrep/functions/dpf_02/ConvertIPStringToInt.java create mode 100644 src/main/java/com/teragrep/functions/dpf_02/SortByClause.java create mode 100644 src/test/java/BatchCollectTest.java diff --git a/.github/workflows/maven-publish.yml b/.github/workflows/maven-publish.yml new file mode 100644 index 0000000..8fb787b --- /dev/null +++ b/.github/workflows/maven-publish.yml @@ -0,0 +1,33 @@ +name: Maven Package + +on: + push: + +jobs: + build: + + runs-on: ubuntu-latest + permissions: + contents: read + packages: write + + steps: + - uses: actions/checkout@v3 + with: + fetch-depth: 0 + + - name: Set up JDK 8 + uses: actions/setup-java@v3 + with: + java-version: '8' + distribution: 'temurin' + server-id: github + settings-path: ${{ github.workspace }} + + - name: Get version + run: echo "RELEASE_VERSION=$(git describe --tags)" >> $GITHUB_ENV + + - name: Publish to GitHub Packages Apache Maven + run: mvn -B -Drevision=${{ env.RELEASE_VERSION }} -Dsha1= -Dchangelist= deploy -s ${{ github.workspace }}/settings.xml + env: + GITHUB_TOKEN: ${{ github.token }} diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..eb7f1c7 --- /dev/null +++ b/.gitignore @@ -0,0 +1,11 @@ +target/* +/target/ +.settings/* +.vscode/* +.classpath +.factorypath +.project +/dependency-reduced-pom.xml +.idea/** +*.iml +.flattened-pom.xml diff --git a/LICENSE b/LICENSE index 0ad25db..05587a3 100644 --- a/LICENSE +++ b/LICENSE @@ -633,8 +633,8 @@ the "copyright" line and a pointer to where the full notice is found. Copyright (C) This program is free software: you can redistribute it and/or modify - it under the terms of the GNU Affero General Public License as published - by the Free Software Foundation, either version 3 of the License, or + it under the terms of the GNU Affero General Public License as published by + the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, @@ -659,3 +659,29 @@ specific requirements. if any, to sign a "copyright disclaimer" for the program, if necessary. For more information on this, and how to apply and follow the GNU AGPL, see . + +Additional permission under GNU Affero General Public License version 3 +section 7 + +If you modify this Program, or any covered work, by linking or combining it with +other code, such other code is not for that reason alone subject to any of the +requirements of the GNU Affero GPL version 3 as long as this Program is the same +Program as licensed from Suomen Kanuuna Oy without any additional modifications. + +Supplemented terms under GNU Affero General Public License version 3 +section 7 + +Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified +versions must be marked as "Modified version of" The Program. + +Names of the licensors and authors may not be used for publicity purposes. + +No rights are granted for use of trade names, trademarks, or service marks +which are in The Program if any. + +Licensee must indemnify licensors and authors for any liability that these +contractual assumptions impose on licensors and authors. + +To the extent this program is licensed as part of the Commercial versions of +Teragrep, the applicable Commercial License may apply to this file if you as +a licensee so wish it. diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..dcb3ee5 --- /dev/null +++ b/pom.xml @@ -0,0 +1,181 @@ + + + jar + 4.0.0 + com.teragrep + dpf_02 + ${revision}${sha1}${changelist} + dpf_02 + https://teragrep.com + + UTF-8 + UTF-8 + 1.8 + 0.0.1 + -SNAPSHOT + + + + scm:git:https://github.com/teragrep/dpf_02.git + scm:git:git@github.com:teragrep/dpf_02.git + https://github.com/teragrep/dpf_02/tree/master + + + + org.apache.spark + spark-core_2.12 + 2.4.5 + provided + + + org.apache.hadoop + hadoop-client + + + + + + org.apache.spark + spark-sql_2.12 + 2.4.5 + provided + + + org.apache.hadoop + hadoop-client + + + + + + + org.apache.hadoop + hadoop-client + 3.1.2 + test + + + org.junit.jupiter + junit-jupiter-engine + 5.4.0-RC1 + test + + + org.junit.platform + junit-platform-launcher + 1.4.0-RC1 + test + + + junit + junit + 4.12 + test + + + org.junit.jupiter + junit-jupiter-api + 5.4.0-RC1 + test + + + + + ${project.basedir}/target + + + org.apache.rat + apache-rat-plugin + 0.15 + false + + + test + + check + + + + + false + + + Also allow the license url to be https. + + https://github.com/teragrep/teragrep/blob/main/LICENSE + + + + true + false + + + .git/** + .gitattributes + .gitignore + .gitmodules + + .github/workflows/*.yml + toolchains.xml + settings.xml + + README.adoc + README.md + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + 8 + 8 + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.22.2 + + all + true + 10 + + + + org.codehaus.mojo + flatten-maven-plugin + 1.1.0 + + true + resolveCiFriendliesOnly + + + + flatten + process-resources + + flatten + + + + flatten.clean + clean + + clean + + + + + + + + + github + https://maven.pkg.github.com/${env.GITHUB_REPOSITORY} + + + diff --git a/src/main/java/com/teragrep/functions/dpf_02/BatchCollect.java b/src/main/java/com/teragrep/functions/dpf_02/BatchCollect.java new file mode 100644 index 0000000..623e5c9 --- /dev/null +++ b/src/main/java/com/teragrep/functions/dpf_02/BatchCollect.java @@ -0,0 +1,246 @@ +package com.teragrep.functions.dpf_02; + +/* + * Teragrep Batch Collect DPF-02 + * Copyright (C) 2019, 2020, 2021, 2022 Suomen Kanuuna Oy + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * + * Additional permission under GNU Affero General Public License version 3 + * section 7 + * + * If you modify this Program, or any covered work, by linking or combining it + * with other code, such other code is not for that reason alone subject to any + * of the requirements of the GNU Affero GPL version 3 as long as this Program + * is the same Program as licensed from Suomen Kanuuna Oy without any additional + * modifications. + * + * Supplemented terms under GNU Affero General Public License version 3 + * section 7 + * + * Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified + * versions must be marked as "Modified version of" The Program. + * + * Names of the licensors and authors may not be used for publicity purposes. + * + * No rights are granted for use of trade names, trademarks, or service marks + * which are in The Program if any. + * + * Licensee must indemnify licensors and authors for any liability that these + * contractual assumptions impose on licensors and authors. + * + * To the extent this program is licensed as part of the Commercial versions of + * Teragrep, the applicable Commercial License may apply to this file if you as + * a licensee so wish it. + */ + +import org.apache.spark.sql.*; +import org.apache.spark.sql.expressions.UserDefinedFunction; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +public final class BatchCollect { + private static final Logger LOGGER = LoggerFactory.getLogger(BatchCollect.class); + private Dataset savedDs = null; + private final String sortColumn; + private final int numberOfRows; + private StructType inputSchema; + private boolean sortedBySingleColumn = false; + private List listOfSortByClauses = null; + + public BatchCollect(String sortColumn, int numberOfRows) { + LOGGER.info("Initialized BatchCollect based on column " + sortColumn + " and a limit of " + numberOfRows + + " row(s)"); + this.sortColumn = sortColumn; + this.numberOfRows = numberOfRows; + } + + public BatchCollect(String sortColumn, int numberOfRows, List listOfSortByClauses) { + LOGGER.info("Initialized BatchCollect based on column " + sortColumn + " and a limit of " + numberOfRows + " row(s)." + + " SortByClauses included: " + (listOfSortByClauses != null ? listOfSortByClauses.size() : "")); + this.sortColumn = sortColumn; + this.numberOfRows = numberOfRows; + + this.listOfSortByClauses = listOfSortByClauses; + } + + /** + * Returns the batchCollected dataframe. + * @param df Dataframe to perform batch collection (and sorting) on + * @param id identification number for the dataframe, usually batch ID + * @param skipLimiting Skips limiting of initial dataset, use for aggregated datasets + * @return sorted dataset + */ + public Dataset call(Dataset df, Long id, boolean skipLimiting) { + if (skipLimiting) { + this.processAggregated(df); + } + else { + this.collect(df, id); + } + + return this.savedDs; + } + + public void collect(Dataset batchDF, Long batchId) { + // check that sortColumn (_time) exists, + // and get the sortColId + // otherwise, no sorting will be done. + if (this.inputSchema == null) { + this.inputSchema = batchDF.schema(); + } + + if (this.listOfSortByClauses == null || this.listOfSortByClauses.size() < 1) { + for (String field : this.inputSchema.fieldNames()) { + if (field.equals(this.sortColumn)) { + this.sortedBySingleColumn = true; + break; + } + } + } + + List collected = orderDatasetByGivenColumns(batchDF).limit(numberOfRows).collectAsList(); + Dataset createdDsFromCollected = SparkSession.builder().getOrCreate().createDataFrame(collected, this.inputSchema); + + if (this.savedDs == null) { + this.savedDs = createdDsFromCollected; + } + else { + this.savedDs = savedDs.union(createdDsFromCollected); + } + + this.savedDs = orderDatasetByGivenColumns(this.savedDs).limit(numberOfRows); + + } + + // Call this instead of collect to skip limiting (for aggregatesUsed=true) + // TODO remove this + public void processAggregated(Dataset ds) { + if (this.inputSchema == null) { + this.inputSchema = ds.schema(); + } + + List collected = orderDatasetByGivenColumns(ds).limit(numberOfRows).collectAsList(); + Dataset createdDsFromCollected = SparkSession.builder().getOrCreate().createDataFrame(collected, this.inputSchema); + + if (this.savedDs == null) { + this.savedDs = createdDsFromCollected; + } + else { + this.savedDs = savedDs.union(createdDsFromCollected); + } + + this.savedDs = orderDatasetByGivenColumns(this.savedDs).limit(numberOfRows); + } + + // Performs orderBy operation on a dataset and returns the ordered one + private Dataset orderDatasetByGivenColumns(Dataset ds) { + final SparkSession ss = SparkSession.builder().getOrCreate(); + + if (this.listOfSortByClauses != null && this.listOfSortByClauses.size() > 0) { + for (SortByClause sbc : listOfSortByClauses) { + if (sbc.getSortAsType() == SortByClause.Type.AUTOMATIC) { + SortByClause.Type autoType = detectSortByType(ds.schema().fields(), sbc.getFieldName()); + ds = orderDatasetBySortByClause(ss, ds, sbc, autoType); + } + else { + ds = orderDatasetBySortByClause(ss, ds, sbc, null); + } + } + } + else if (this.sortedBySingleColumn) { + ds = ds.orderBy(functions.col(this.sortColumn).desc()); + } + + return ds; + } + + // orderBy based on sortByClause type and if it is descending/ascending + private Dataset orderDatasetBySortByClause(final SparkSession ss, final Dataset unsorted, final SortByClause sortByClause, final SortByClause.Type overrideSortType) { + Dataset rv = null; + SortByClause.Type sortByType = sortByClause.getSortAsType(); + if (overrideSortType != null) { + sortByType = overrideSortType; + } + + switch (sortByType) { + case DEFAULT: + rv = unsorted.orderBy(sortByClause.isDescending() ? + functions.col(sortByClause.getFieldName()).desc() : + functions.col(sortByClause.getFieldName()).asc()); + break; + case STRING: + rv = unsorted.orderBy(sortByClause.isDescending() ? + functions.col(sortByClause.getFieldName()).cast(DataTypes.StringType).desc() : + functions.col(sortByClause.getFieldName()).cast(DataTypes.StringType).asc()); + break; + case NUMERIC: + rv = unsorted.orderBy(sortByClause.isDescending() ? + functions.col(sortByClause.getFieldName()).cast(DataTypes.DoubleType).desc() : + functions.col(sortByClause.getFieldName()).cast(DataTypes.DoubleType).asc()); + break; + case IP_ADDRESS: + UserDefinedFunction ipStringToIntUDF = functions.udf(new ConvertIPStringToInt(), DataTypes.LongType); + ss.udf().register("ip_string_to_int", ipStringToIntUDF); + Column sortingCol = functions.callUDF("ip_string_to_int", functions.col(sortByClause.getFieldName())); + + rv = unsorted.orderBy(sortByClause.isDescending() ? sortingCol.desc() : sortingCol.asc()); + break; + } + return rv; + } + + // detect sorting type if auto() was used in sort + private SortByClause.Type detectSortByType(final StructField[] fields, final String fieldName) { + for (StructField field : fields) { + if (field.name().equals(fieldName)) { + switch (field.dataType().typeName()) { + case "string": // ip address? + return SortByClause.Type.STRING; + case "long": + case "integer": + case "float": + case "double": + return SortByClause.Type.NUMERIC; + case "timestamp": + return SortByClause.Type.NUMERIC; // convert to unix epoch? + default: + return SortByClause.Type.DEFAULT; + } + } + } + return SortByClause.Type.DEFAULT; + } + + // TODO: Remove + public List getCollected() { + return this.savedDs.collectAsList(); + } + + public Dataset getCollectedAsDataframe() { + return this.savedDs; + } + + public void clear() { + LOGGER.info("dpf_02 cleared"); + this.savedDs = null; + this.inputSchema = null; + } +} diff --git a/src/main/java/com/teragrep/functions/dpf_02/ConvertIPStringToInt.java b/src/main/java/com/teragrep/functions/dpf_02/ConvertIPStringToInt.java new file mode 100644 index 0000000..b31ae2d --- /dev/null +++ b/src/main/java/com/teragrep/functions/dpf_02/ConvertIPStringToInt.java @@ -0,0 +1,75 @@ +package com.teragrep.functions.dpf_02; + +/* + * Teragrep Batch Collect DPF-02 + * Copyright (C) 2019, 2020, 2021, 2022 Suomen Kanuuna Oy + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * + * Additional permission under GNU Affero General Public License version 3 + * section 7 + * + * If you modify this Program, or any covered work, by linking or combining it + * with other code, such other code is not for that reason alone subject to any + * of the requirements of the GNU Affero GPL version 3 as long as this Program + * is the same Program as licensed from Suomen Kanuuna Oy without any additional + * modifications. + * + * Supplemented terms under GNU Affero General Public License version 3 + * section 7 + * + * Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified + * versions must be marked as "Modified version of" The Program. + * + * Names of the licensors and authors may not be used for publicity purposes. + * + * No rights are granted for use of trade names, trademarks, or service marks + * which are in The Program if any. + * + * Licensee must indemnify licensors and authors for any liability that these + * contractual assumptions impose on licensors and authors. + * + * To the extent this program is licensed as part of the Commercial versions of + * Teragrep, the applicable Commercial License may apply to this file if you as + * a licensee so wish it. + */ + +import org.apache.spark.sql.api.java.UDF1; + +import java.net.InetAddress; +import java.net.UnknownHostException; + +public class ConvertIPStringToInt implements UDF1 { + @Override + public Long call(Object o) throws Exception { + String ip = o.toString(); + + InetAddress ip1; + long ip_addr1 = 0L; + + try { + ip1 = InetAddress.getByName(ip); + + } catch (UnknownHostException e) { + throw new RuntimeException(e); + } + + for (byte b : ip1.getAddress()) { + ip_addr1 = ip_addr1 << 8 | (b & 0xFF); + } + + return ip_addr1; + } +} diff --git a/src/main/java/com/teragrep/functions/dpf_02/SortByClause.java b/src/main/java/com/teragrep/functions/dpf_02/SortByClause.java new file mode 100644 index 0000000..f34c607 --- /dev/null +++ b/src/main/java/com/teragrep/functions/dpf_02/SortByClause.java @@ -0,0 +1,127 @@ +package com.teragrep.functions.dpf_02; + +/* + * Teragrep Batch Collect DPF-02 + * Copyright (C) 2019, 2020, 2021, 2022 Suomen Kanuuna Oy + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * + * Additional permission under GNU Affero General Public License version 3 + * section 7 + * + * If you modify this Program, or any covered work, by linking or combining it + * with other code, such other code is not for that reason alone subject to any + * of the requirements of the GNU Affero GPL version 3 as long as this Program + * is the same Program as licensed from Suomen Kanuuna Oy without any additional + * modifications. + * + * Supplemented terms under GNU Affero General Public License version 3 + * section 7 + * + * Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified + * versions must be marked as "Modified version of" The Program. + * + * Names of the licensors and authors may not be used for publicity purposes. + * + * No rights are granted for use of trade names, trademarks, or service marks + * which are in The Program if any. + * + * Licensee must indemnify licensors and authors for any liability that these + * contractual assumptions impose on licensors and authors. + * + * To the extent this program is licensed as part of the Commercial versions of + * Teragrep, the applicable Commercial License may apply to this file if you as + * a licensee so wish it. + */ + +import java.io.Serializable; + +/** + * Class for the different sortByClauses in the 'sort' command + */ +public class SortByClause implements Serializable { + private boolean descending = false; // + or - + private String fieldName = null; // sort based on field + + private int limit = 10000; // limit final amount of rows + + public int getLimit() { + return limit; + } + + public void setLimit(int limit) { + this.limit = limit; + } + + public enum Type { // type of sorting, e.g. auto(), str(), num(), ip() or just field name (default) + DEFAULT, AUTOMATIC, STRING, NUMERIC, IP_ADDRESS + } + + private Type sortAsType = Type.DEFAULT; // default is no special sorting + + public boolean isDescending() { + return descending; + } + + public void setDescending(boolean descending) { + this.descending = descending; + } + + public String getFieldName() { + return fieldName; + } + + public void setFieldName(String fieldName) { + this.fieldName = fieldName; + } + + public Type getSortAsType() { + return sortAsType; + } + + public void setSortAsType(Type sortAsType) { + this.sortAsType = sortAsType; + } + + // Builds a string with a similar representation as in the original command + @Override + public String toString() { + String rv = ""; + if (this.descending) { + rv += "-"; + } + else { + rv += "+"; + } + + if (this.sortAsType.equals(Type.AUTOMATIC)) { + rv += "auto(" + this.fieldName + ")"; + } + else if (this.sortAsType.equals(Type.STRING)) { + rv += "str(" + this.fieldName + ")"; + } + else if (this.sortAsType.equals(Type.DEFAULT)) { + rv += this.fieldName; + } + else if (this.sortAsType.equals(Type.IP_ADDRESS)) { + rv += "ip(" + this.fieldName + ")"; + } + else if (this.sortAsType.equals(Type.NUMERIC)) { + rv += "num(" + this.fieldName + ")"; + } + + return rv; + } +} diff --git a/src/test/java/BatchCollectTest.java b/src/test/java/BatchCollectTest.java new file mode 100644 index 0000000..149ce18 --- /dev/null +++ b/src/test/java/BatchCollectTest.java @@ -0,0 +1,300 @@ +/* + * Teragrep Batch Collect DPF-02 + * Copyright (C) 2019, 2020, 2021, 2022 Suomen Kanuuna Oy + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * + * Additional permission under GNU Affero General Public License version 3 + * section 7 + * + * If you modify this Program, or any covered work, by linking or combining it + * with other code, such other code is not for that reason alone subject to any + * of the requirements of the GNU Affero GPL version 3 as long as this Program + * is the same Program as licensed from Suomen Kanuuna Oy without any additional + * modifications. + * + * Supplemented terms under GNU Affero General Public License version 3 + * section 7 + * + * Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified + * versions must be marked as "Modified version of" The Program. + * + * Names of the licensors and authors may not be used for publicity purposes. + * + * No rights are granted for use of trade names, trademarks, or service marks + * which are in The Program if any. + * + * Licensee must indemnify licensors and authors for any liability that these + * contractual assumptions impose on licensors and authors. + * + * To the extent this program is licensed as part of the Commercial versions of + * Teragrep, the applicable Commercial License may apply to this file if you as + * a licensee so wish it. + */ + +import com.teragrep.functions.dpf_02.BatchCollect; +import org.apache.spark.api.java.function.VoidFunction2; +import org.apache.spark.sql.*; +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder; +import org.apache.spark.sql.catalyst.encoders.RowEncoder; +import org.apache.spark.sql.execution.streaming.MemoryStream; +import org.apache.spark.sql.streaming.StreamingQuery; +import org.apache.spark.sql.streaming.StreamingQueryException; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.MetadataBuilder; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Assertions; +import scala.collection.JavaConverters; +import scala.collection.Seq; + +import java.sql.Timestamp; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.TreeMap; + +public class BatchCollectTest { + + // see https://stackoverflow.com/questions/56894068/how-to-perform-unit-testing-on-spark-structured-streaming + // see ./sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala at 2.4.5 + + private static final StructType testSchema = new StructType( + new StructField[]{ + new StructField("_time", DataTypes.TimestampType, false, new MetadataBuilder().build()), + new StructField("_raw", DataTypes.StringType, false, new MetadataBuilder().build()), + new StructField("index", DataTypes.StringType, false, new MetadataBuilder().build()), + new StructField("sourcetype", DataTypes.StringType, false, new MetadataBuilder().build()), + new StructField("host", DataTypes.StringType, false, new MetadataBuilder().build()), + new StructField("source", DataTypes.StringType, false, new MetadataBuilder().build()), + new StructField("partition", DataTypes.StringType, false, new MetadataBuilder().build()), + new StructField("offset", DataTypes.LongType, false, new MetadataBuilder().build()) + } + ); + + //@Test + public void testCollect() throws StreamingQueryException, InterruptedException { + + SparkSession sparkSession = SparkSession.builder().master("local[*]").getOrCreate(); + SQLContext sqlContext = sparkSession.sqlContext(); + + sparkSession.sparkContext().setLogLevel("ERROR"); + + ExpressionEncoder encoder = RowEncoder.apply(testSchema); + MemoryStream rowMemoryStream = + new MemoryStream<>(1, sqlContext, encoder); + + BatchCollect batchCollect = new BatchCollect("_time", 100, null); + Dataset rowDataset = rowMemoryStream.toDF(); + StreamingQuery streamingQuery = startStream(rowDataset, batchCollect); + + long run = 0; + long counter = 1; + while (streamingQuery.isActive()) { + //System.out.println(batchCollect.getCollected().size()); + + Timestamp time = Timestamp.valueOf(LocalDateTime.ofInstant(Instant.now(), ZoneOffset.UTC)); + if (run == 3) { + // make run 3 to be latest always + time = Timestamp.valueOf(LocalDateTime.ofInstant(Instant.ofEpochSecond(13851486065L+counter), ZoneOffset.UTC)); + } + + rowMemoryStream.addData( + // make rows containing counter as offset and run as partition + makeRows( + time, + "data data", + "topic", + "stream", + "host", + "input", + String.valueOf(run), + counter, + 1 + ) + ); + + // create 20 events for 10 runs + if (counter == 20) { + run++; + counter = 0; + } + counter++; + streamingQuery.processAllAvailable(); + + if (run == 10) { + // 10 runs only + // wait until the source feeds them all? + // TODO there must be a better way? +// streamingQuery.processAllAvailable(); + streamingQuery.stop(); + streamingQuery.awaitTermination(); + } + } + + + LinkedList runs = new LinkedList<>(); + runs.add(3); + runs.add(6); + runs.add(7); + runs.add(8); + runs.add(9); + verifyRuns(batchCollect, runs); + } + + @Test + public void testCollectAsDataframe() throws StreamingQueryException, InterruptedException { + SparkSession sparkSession = SparkSession.builder().master("local[*]").getOrCreate(); + SQLContext sqlContext = sparkSession.sqlContext(); + + sparkSession.sparkContext().setLogLevel("ERROR"); + + ExpressionEncoder encoder = RowEncoder.apply(testSchema); + MemoryStream rowMemoryStream = + new MemoryStream<>(1, sqlContext, encoder); + + BatchCollect batchCollect = new BatchCollect("_time", 100, null); + Dataset rowDataset = rowMemoryStream.toDF(); + StreamingQuery streamingQuery = startStream(rowDataset, batchCollect); + + long run = 0; + long counter = 0; + while (streamingQuery.isActive()) { + //System.out.println(batchCollect.getCollected().size()); + + Timestamp time = Timestamp.valueOf(LocalDateTime.ofInstant(Instant.now(), ZoneOffset.UTC)); + if (run == 3) { + // make run 3 to be latest always + time = Timestamp.valueOf(LocalDateTime.ofInstant(Instant.ofEpochSecond(13851486065L+counter), ZoneOffset.UTC)); + } + + rowMemoryStream.addData( + // make rows containing counter as offset and run as partition + makeRows( + time, + "data data", + "topic", + "stream", + "host", + "input", + String.valueOf(run), + counter, + 1 + ) + ); + + // create 20 events for 10 runs + if (counter == 20) { + run++; + counter = 0; + } + counter++; + + if (run == 10) { + // 10 runs only + // wait until the source feeds them all? + // TODO there must be a better way? + streamingQuery.processAllAvailable(); + streamingQuery.stop(); + streamingQuery.awaitTermination(); + } + } + + Dataset collectedAsDF = batchCollect.getCollectedAsDataframe(); + collectedAsDF.show(5, true); + Assertions.assertTrue(collectedAsDF instanceof Dataset); + //Assertions.assertEquals(200, collectedAsDF.count()); + } + + private Seq makeRows(Timestamp _time, + String _raw, + String index, + String sourcetype, + String host, + String source, + String partition, + Long offset, + long amount) { + ArrayList rowArrayList = new ArrayList<>(); + + Row row = RowFactory.create( + _time, + _raw, + index, + sourcetype, + host, + source, + partition, + offset + ); + + while (amount > 0) { + rowArrayList.add(row); + amount--; + } + + Seq rowSeq = JavaConverters.asScalaIteratorConverter(rowArrayList.iterator()).asScala().toSeq(); + + return rowSeq; + } + + + private StreamingQuery startStream(Dataset rowDataset, BatchCollect batchCollect) { + return rowDataset + .writeStream() + .foreachBatch( + new VoidFunction2, Long>() { + @Override + public void call(Dataset batchDF, Long batchId) throws Exception { + batchCollect.collect(batchDF, batchId); + } + } + ) + .outputMode("append") + .start(); + } + + private void verifyRuns(BatchCollect batchCollect, LinkedList runs) { + // test that 0-4 batches added data to 100 slots + List collectedList = batchCollect.getCollected(); + + TreeMap runToRow = new TreeMap<>(); + + int arraySize = collectedList.size(); + while (arraySize != 0) { + Row row = collectedList.get(arraySize - 1); + int rowRun = Integer.parseInt(row.getString(6)); + + if(runToRow.containsKey(rowRun)) { + long value = runToRow.get(rowRun); + value++; + runToRow.put(rowRun, value); + } + else { + runToRow.put(rowRun, 1L); + } + arraySize--; + + } + + for(int run : runs) { + Assertions.assertEquals(20, runToRow.get(run), "batch "+ run +" contained other than 20 messages"); + } + } +}