Skip to content

Commit

Permalink
change BloomFilterAggregator to use Array(BinaryType) instead of Arra…
Browse files Browse the repository at this point in the history
…y(Array(ByteType)) (#24)
  • Loading branch information
eemhu authored Jan 4, 2024
1 parent bdbb605 commit f9cf288
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,10 @@ import scala.collection.mutable
import scala.reflect.ClassTag
import scala.collection.JavaConverters._

class BloomFilterAggregator(final val columnName: String, final val estimateName: String, sortedSizeMap: java.util.SortedMap[java.lang.Long, java.lang.Double]) extends Aggregator[Row, BloomFilter, Array[Byte]]
with Serializable {
class BloomFilterAggregator(final val columnName: String,
final val estimateName: String,
sortedSizeMap: java.util.SortedMap[java.lang.Long, java.lang.Double])
extends Aggregator[Row, BloomFilter, Array[Byte]] with Serializable {

var tokenizer: Option[Tokenizer] = None

Expand All @@ -68,16 +70,15 @@ class BloomFilterAggregator(final val columnName: String, final val estimateName

override def reduce(buffer: BloomFilter, row: Row): BloomFilter = {
var newBuffer = buffer
val tokens : mutable.WrappedArray[mutable.WrappedArray[Byte]] = row.getAs[mutable.WrappedArray[mutable.WrappedArray[Byte]]](columnName)
val tokens : mutable.WrappedArray[Array[Byte]] = row.getAs[mutable.WrappedArray[Array[Byte]]](columnName)
val estimate: Long = row.getAs[Long](estimateName)

if (newBuffer.bitSize() == 64) { // zero() will have 64 bitSize
newBuffer = selectFilterFromMap(estimate)
}

for (token : mutable.WrappedArray[Byte] <- tokens) {
val tokenByteArray: Array[Byte] = token.toArray
newBuffer.putBinary(tokenByteArray)
for (token : Array[Byte] <- tokens) {
newBuffer.putBinary(token)
}

newBuffer
Expand Down
2 changes: 1 addition & 1 deletion src/test/scala/BloomFilterAggregatorTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class BloomFilterAggregatorTest {


// create Scala udf
val tokenizerUDF = functions.udf(new TokenizerUDF, DataTypes.createArrayType(DataTypes.createArrayType(ByteType, false), false))
val tokenizerUDF = functions.udf(new TokenizerUDF, DataTypes.createArrayType(DataTypes.BinaryType, false))
// register udf
sparkSession.udf.register("tokenizer_udf", tokenizerUDF)

Expand Down

0 comments on commit f9cf288

Please sign in to comment.