From f9cf28887129e6e6e338eca7f22b5b560f66f524 Mon Sep 17 00:00:00 2001 From: eemhu <125959687+eemhu@users.noreply.github.com> Date: Thu, 4 Jan 2024 11:43:24 +0200 Subject: [PATCH] change BloomFilterAggregator to use Array(BinaryType) instead of Array(Array(ByteType)) (#24) --- .../functions/dpf_03/BloomFilterAggregator.scala | 13 +++++++------ src/test/scala/BloomFilterAggregatorTest.scala | 2 +- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/src/main/scala/com/teragrep/functions/dpf_03/BloomFilterAggregator.scala b/src/main/scala/com/teragrep/functions/dpf_03/BloomFilterAggregator.scala index 0765ee8..a0e2c3d 100644 --- a/src/main/scala/com/teragrep/functions/dpf_03/BloomFilterAggregator.scala +++ b/src/main/scala/com/teragrep/functions/dpf_03/BloomFilterAggregator.scala @@ -57,8 +57,10 @@ import scala.collection.mutable import scala.reflect.ClassTag import scala.collection.JavaConverters._ -class BloomFilterAggregator(final val columnName: String, final val estimateName: String, sortedSizeMap: java.util.SortedMap[java.lang.Long, java.lang.Double]) extends Aggregator[Row, BloomFilter, Array[Byte]] - with Serializable { +class BloomFilterAggregator(final val columnName: String, + final val estimateName: String, + sortedSizeMap: java.util.SortedMap[java.lang.Long, java.lang.Double]) + extends Aggregator[Row, BloomFilter, Array[Byte]] with Serializable { var tokenizer: Option[Tokenizer] = None @@ -68,16 +70,15 @@ class BloomFilterAggregator(final val columnName: String, final val estimateName override def reduce(buffer: BloomFilter, row: Row): BloomFilter = { var newBuffer = buffer - val tokens : mutable.WrappedArray[mutable.WrappedArray[Byte]] = row.getAs[mutable.WrappedArray[mutable.WrappedArray[Byte]]](columnName) + val tokens : mutable.WrappedArray[Array[Byte]] = row.getAs[mutable.WrappedArray[Array[Byte]]](columnName) val estimate: Long = row.getAs[Long](estimateName) if (newBuffer.bitSize() == 64) { // zero() will have 64 bitSize newBuffer = selectFilterFromMap(estimate) } - for (token : mutable.WrappedArray[Byte] <- tokens) { - val tokenByteArray: Array[Byte] = token.toArray - newBuffer.putBinary(tokenByteArray) + for (token : Array[Byte] <- tokens) { + newBuffer.putBinary(token) } newBuffer diff --git a/src/test/scala/BloomFilterAggregatorTest.scala b/src/test/scala/BloomFilterAggregatorTest.scala index ba92450..b384f59 100644 --- a/src/test/scala/BloomFilterAggregatorTest.scala +++ b/src/test/scala/BloomFilterAggregatorTest.scala @@ -97,7 +97,7 @@ class BloomFilterAggregatorTest { // create Scala udf - val tokenizerUDF = functions.udf(new TokenizerUDF, DataTypes.createArrayType(DataTypes.createArrayType(ByteType, false), false)) + val tokenizerUDF = functions.udf(new TokenizerUDF, DataTypes.createArrayType(DataTypes.BinaryType, false)) // register udf sparkSession.udf.register("tokenizer_udf", tokenizerUDF)