Skip to content

Commit

Permalink
GH-15107: [C++][Parquet] Parquet Encoder: Support RLE for Boolean (#3…
Browse files Browse the repository at this point in the history
…4526)

Create RLE Encoder for Boolean.

### Rationale for this change

Boolean current can only use Plain Encoder, here it support RLE.

### What changes are included in this PR?

Create encoder

### Are these changes tested?

Yes

### Are there any user-facing changes?

Yes, user can use new kind of encoding.

* Closes: #15107

Authored-by: mwish <maplewish117@gmail.com>
Signed-off-by: Will Jones <willjones127@gmail.com>
  • Loading branch information
mapleFU authored Mar 21, 2023
1 parent f2c3928 commit aa8a118
Show file tree
Hide file tree
Showing 5 changed files with 252 additions and 15 deletions.
2 changes: 1 addition & 1 deletion cpp/src/arrow/util/bit_stream_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class BitWriter {
/// Writes v to the next aligned byte using num_bytes. If T is larger than
/// num_bytes, the extra high-order bytes will be ignored. Returns false if
/// there was not enough space.
/// Assume the v is stored in buffer_ as a litte-endian format
/// Assume the v is stored in buffer_ as a little-endian format
template <typename T>
bool PutAligned(T v, int num_bytes);

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/util/endian.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ static inline T FromLittleEndian(T value) {
}
#endif

// Handle endianness in *word* granuality (keep individual array element untouched)
// Handle endianness in *word* granularity (keep individual array element untouched)
namespace little_endian {

namespace detail {
Expand Down
3 changes: 1 addition & 2 deletions cpp/src/arrow/util/rle_encoding.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,7 @@ class RleEncoder {
// bit-packed repeated value
int min_repeated_run_size =
1 + static_cast<int>(::arrow::bit_util::BytesForBits(bit_width));
int repeated_max_size = static_cast<int>(::arrow::bit_util::CeilDiv(num_values, 8)) *
min_repeated_run_size;
int repeated_max_size = num_runs * min_repeated_run_size;

return std::max(literal_max_size, repeated_max_size);
}
Expand Down
162 changes: 151 additions & 11 deletions cpp/src/parquet/encoding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,15 @@ struct DictEncoderTraits<FLBAType> {
// Initially 1024 elements
static constexpr int32_t kInitialHashTableSize = 1 << 10;

int RlePreserveBufferSize(int num_values, int bit_width) {
// Note: because of the way RleEncoder::CheckBufferFull()
// is called, we have to reserve an extra "RleEncoder::MinBufferSize"
// bytes. These extra bytes won't be used but not reserving them
// would cause the encoder to fail.
return ::arrow::util::RleEncoder::MaxBufferSize(bit_width, num_values) +
::arrow::util::RleEncoder::MinBufferSize(bit_width);
}

/// See the dictionary encoding section of
/// https://github.com/Parquet/parquet-format. The encoding supports
/// streaming encoding. Values are encoded as they are added while the
Expand All @@ -480,6 +489,10 @@ class DictEncoderImpl : public EncoderImpl, virtual public DictEncoder<DType> {
public:
typedef typename DType::c_type T;

/// In data page, the bit width used to encode the entry
/// ids stored as 1 byte (max bit width = 32).
constexpr static int32_t kDataPageBitWidthBytes = 1;

explicit DictEncoderImpl(const ColumnDescriptor* desc, MemoryPool* pool)
: EncoderImpl(desc, Encoding::PLAIN_DICTIONARY, pool),
buffered_indices_(::arrow::stl::allocator<int32_t>(pool)),
Expand All @@ -504,22 +517,16 @@ class DictEncoderImpl : public EncoderImpl, virtual public DictEncoder<DType> {
encoder.Flush();

ClearIndices();
return 1 + encoder.len();
return kDataPageBitWidthBytes + encoder.len();
}

void set_type_length(int type_length) { this->type_length_ = type_length; }

/// Returns a conservative estimate of the number of bytes needed to encode the buffered
/// indices. Used to size the buffer passed to WriteIndices().
int64_t EstimatedDataEncodedSize() override {
// Note: because of the way RleEncoder::CheckBufferFull() is called, we have to
// reserve
// an extra "RleEncoder::MinBufferSize" bytes. These extra bytes won't be used
// but not reserving them would cause the encoder to fail.
return 1 +
::arrow::util::RleEncoder::MaxBufferSize(
bit_width(), static_cast<int>(buffered_indices_.size())) +
::arrow::util::RleEncoder::MinBufferSize(bit_width());
return kDataPageBitWidthBytes +
RlePreserveBufferSize(static_cast<int>(buffered_indices_.size()), bit_width());
}

/// The minimum bit width required to encode the currently buffered indices.
Expand Down Expand Up @@ -2390,6 +2397,7 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder<DTyp
int64_t valid_bits_offset,
typename EncodingTraits<DType>::Accumulator* out) override {
if (null_count != 0) {
// TODO(ARROW-34660): implement DecodeArrow with null slots.
ParquetException::NYI("Delta bit pack DecodeArrow with null slots");
}
std::vector<T> values(num_values);
Expand All @@ -2402,6 +2410,7 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder<DTyp
int64_t valid_bits_offset,
typename EncodingTraits<DType>::DictAccumulator* out) override {
if (null_count != 0) {
// TODO(ARROW-34660): implement DecodeArrow with null slots.
ParquetException::NYI("Delta bit pack DecodeArrow with null slots");
}
std::vector<T> values(num_values);
Expand Down Expand Up @@ -2838,6 +2847,110 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl,
std::shared_ptr<ResizableBuffer> buffered_data_;
};

// ----------------------------------------------------------------------
// RLE_BOOLEAN_ENCODER

class RleBooleanEncoder final : public EncoderImpl, virtual public BooleanEncoder {
public:
explicit RleBooleanEncoder(const ColumnDescriptor* descr, ::arrow::MemoryPool* pool)
: EncoderImpl(descr, Encoding::RLE, pool),
buffered_append_values_(::arrow::stl::allocator<T>(pool)) {}

int64_t EstimatedDataEncodedSize() override {
return kRleLengthInBytes + MaxRleBufferSize();
}

std::shared_ptr<Buffer> FlushValues() override;

void Put(const T* buffer, int num_values) override;
void Put(const ::arrow::Array& values) override {
if (values.type_id() != ::arrow::Type::BOOL) {
throw ParquetException("RleBooleanEncoder expects BooleanArray, got ",
values.type()->ToString());
}
const auto& boolean_array = checked_cast<const ::arrow::BooleanArray&>(values);
if (values.null_count() == 0) {
for (int i = 0; i < boolean_array.length(); ++i) {
// null_count == 0, so just call Value directly is ok.
buffered_append_values_.push_back(boolean_array.Value(i));
}
} else {
PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<::arrow::BooleanType>(
*boolean_array.data(),
[&](bool value) {
buffered_append_values_.push_back(value);
return Status::OK();
},
[]() { return Status::OK(); }));
}
}

void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
int64_t valid_bits_offset) override {
if (valid_bits != NULLPTR) {
PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T),
this->memory_pool()));
T* data = reinterpret_cast<T*>(buffer->mutable_data());
int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
src, num_values, valid_bits, valid_bits_offset, data);
Put(data, num_valid_values);
} else {
Put(src, num_values);
}
}

void Put(const std::vector<bool>& src, int num_values) override;

protected:
template <typename SequenceType>
void PutImpl(const SequenceType& src, int num_values);

int MaxRleBufferSize() const noexcept {
return RlePreserveBufferSize(static_cast<int>(buffered_append_values_.size()),
kBitWidth);
}

constexpr static int32_t kBitWidth = 1;
/// 4 bytes in little-endian, which indicates the length.
constexpr static int32_t kRleLengthInBytes = 4;

// std::vector<bool> in C++ is tricky, because it's a bitmap.
// Here RleBooleanEncoder will only append values into it, and
// dump values into Buffer, so using it here is ok.
ArrowPoolVector<bool> buffered_append_values_;
};

void RleBooleanEncoder::Put(const bool* src, int num_values) { PutImpl(src, num_values); }

void RleBooleanEncoder::Put(const std::vector<bool>& src, int num_values) {
PutImpl(src, num_values);
}

template <typename SequenceType>
void RleBooleanEncoder::PutImpl(const SequenceType& src, int num_values) {
for (int i = 0; i < num_values; ++i) {
buffered_append_values_.push_back(src[i]);
}
}

std::shared_ptr<Buffer> RleBooleanEncoder::FlushValues() {
int rle_buffer_size_max = MaxRleBufferSize();
std::shared_ptr<ResizableBuffer> buffer =
AllocateBuffer(this->pool_, rle_buffer_size_max + kRleLengthInBytes);
::arrow::util::RleEncoder encoder(buffer->mutable_data() + kRleLengthInBytes,
rle_buffer_size_max, /*bit_width*/ kBitWidth);

for (bool value : buffered_append_values_) {
encoder.Put(value ? 1 : 0);
}
encoder.Flush();
::arrow::util::SafeStore(buffer->mutable_data(),
::arrow::bit_util::ToLittleEndian(encoder.len()));
PARQUET_THROW_NOT_OK(buffer->Resize(kRleLengthInBytes + encoder.len()));
buffered_append_values_.clear();
return buffer;
}

// ----------------------------------------------------------------------
// RLE_BOOLEAN_DECODER

Expand All @@ -2855,7 +2968,7 @@ class RleBooleanDecoder : public DecoderImpl, virtual public BooleanDecoder {
" (corrupt data page?)");
}
// Load the first 4 bytes in little-endian, which indicates the length
num_bytes = ::arrow::bit_util::ToLittleEndian(SafeLoadAs<uint32_t>(data));
num_bytes = ::arrow::bit_util::FromLittleEndian(SafeLoadAs<uint32_t>(data));
if (num_bytes < 0 || num_bytes > static_cast<uint32_t>(len - 4)) {
throw ParquetException("Received invalid number of bytes : " +
std::to_string(num_bytes) + " (corrupt data page?)");
Expand Down Expand Up @@ -2883,7 +2996,27 @@ class RleBooleanDecoder : public DecoderImpl, virtual public BooleanDecoder {
int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
int64_t valid_bits_offset,
typename EncodingTraits<BooleanType>::Accumulator* out) override {
ParquetException::NYI("DecodeArrow for RleBooleanDecoder");
if (null_count != 0) {
// TODO(ARROW-34660): implement DecodeArrow with null slots.
ParquetException::NYI("RleBoolean DecodeArrow with null slots");
}
constexpr int kBatchSize = 1024;
std::array<bool, kBatchSize> values;
int sum_decode_count = 0;
do {
int current_batch = std::min(kBatchSize, num_values);
int decoded_count = decoder_->GetBatch(values.data(), current_batch);
if (decoded_count == 0) {
break;
}
sum_decode_count += decoded_count;
PARQUET_THROW_NOT_OK(out->Reserve(sum_decode_count));
for (int i = 0; i < decoded_count; ++i) {
PARQUET_THROW_NOT_OK(out->Append(values[i]));
}
num_values -= decoded_count;
} while (num_values > 0);
return sum_decode_count;
}

int DecodeArrow(
Expand Down Expand Up @@ -3265,6 +3398,13 @@ std::unique_ptr<Encoder> MakeEncoder(Type::type type_num, Encoding::type encodin
default:
throw ParquetException("DELTA_LENGTH_BYTE_ARRAY only supports BYTE_ARRAY");
}
} else if (encoding == Encoding::RLE) {
switch (type_num) {
case Type::BOOLEAN:
return std::make_unique<RleBooleanEncoder>(descr, pool);
default:
throw ParquetException("RLE only supports BOOLEAN");
}
} else {
ParquetException::NYI("Selected encoding is not supported");
}
Expand Down
98 changes: 98 additions & 0 deletions cpp/src/parquet/encoding_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,33 @@ class EncodingAdHocTyped : public ::testing::Test {
::arrow::AssertArraysEqual(*values, *result);
}

void Rle(int seed) {
if (!std::is_same<ParquetType, BooleanType>::value) {
return;
}
auto values = GetValues(seed);
auto encoder = MakeTypedEncoder<ParquetType>(Encoding::RLE, /*use_dictionary=*/false,
column_descr());
auto decoder = MakeTypedDecoder<ParquetType>(Encoding::RLE, column_descr());

ASSERT_NO_THROW(encoder->Put(*values));
auto buf = encoder->FlushValues();

int num_values = static_cast<int>(values->length() - values->null_count());
decoder->SetData(num_values, buf->data(), static_cast<int>(buf->size()));

BuilderType acc(arrow_type(), ::arrow::default_memory_pool());
ASSERT_EQ(num_values,
decoder->DecodeArrow(static_cast<int>(values->length()),
static_cast<int>(values->null_count()),
values->null_bitmap_data(), values->offset(), &acc));

std::shared_ptr<::arrow::Array> result;
ASSERT_OK(acc.Finish(&result));
ASSERT_EQ(50, result->length());
::arrow::AssertArraysEqual(*values, *result);
}

void DeltaBitPack(int seed) {
if (!std::is_same<ParquetType, Int32Type>::value &&
!std::is_same<ParquetType, Int64Type>::value) {
Expand Down Expand Up @@ -909,6 +936,14 @@ TYPED_TEST(EncodingAdHocTyped, ByteStreamSplitArrowDirectPut) {
}
}

TYPED_TEST(EncodingAdHocTyped, RleArrowDirectPut) {
// TODO: test with nulls once RleBooleanDecoder::DecodeArrow supports them
this->null_probability_ = 0;
for (auto seed : {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) {
this->Rle(seed);
}
}

TYPED_TEST(EncodingAdHocTyped, DeltaBitPackArrowDirectPut) {
// TODO: test with nulls once DeltaBitPackDecoder::DecodeArrow supports them
this->null_probability_ = 0;
Expand Down Expand Up @@ -1576,6 +1611,69 @@ TYPED_TEST(TestDeltaBitPackEncoding, NonZeroPaddedMiniblockBitWidth) {
}
}

// ----------------------------------------------------------------------
// Rle for Boolean encode/decode tests.

class TestRleBooleanEncoding : public TestEncodingBase<BooleanType> {
public:
using c_type = bool;
static constexpr int TYPE = Type::BOOLEAN;

virtual void CheckRoundtrip() {
auto encoder = MakeTypedEncoder<BooleanType>(Encoding::RLE,
/*use_dictionary=*/false, descr_.get());
auto decoder = MakeTypedDecoder<BooleanType>(Encoding::RLE, descr_.get());

for (int i = 0; i < 3; ++i) {
encoder->Put(draws_, num_values_);
encode_buffer_ = encoder->FlushValues();

decoder->SetData(num_values_, encode_buffer_->data(),
static_cast<int>(encode_buffer_->size()));
int values_decoded = decoder->Decode(decode_buf_, num_values_);
ASSERT_EQ(num_values_, values_decoded);
ASSERT_NO_FATAL_FAILURE(VerifyResults<c_type>(decode_buf_, draws_, num_values_));
}
}

void CheckRoundtripSpaced(const uint8_t* valid_bits, int64_t valid_bits_offset) {
auto encoder = MakeTypedEncoder<BooleanType>(Encoding::RLE,
/*use_dictionary=*/false, descr_.get());
auto decoder = MakeTypedDecoder<BooleanType>(Encoding::RLE, descr_.get());
int null_count = 0;
for (auto i = 0; i < num_values_; i++) {
if (!bit_util::GetBit(valid_bits, valid_bits_offset + i)) {
null_count++;
}
}
for (int i = 0; i < 3; ++i) {
encoder->PutSpaced(draws_, num_values_, valid_bits, valid_bits_offset);
encode_buffer_ = encoder->FlushValues();
decoder->SetData(num_values_ - null_count, encode_buffer_->data(),
static_cast<int>(encode_buffer_->size()));
auto values_decoded = decoder->DecodeSpaced(decode_buf_, num_values_, null_count,
valid_bits, valid_bits_offset);
ASSERT_EQ(num_values_, values_decoded);
ASSERT_NO_FATAL_FAILURE(VerifyResultsSpaced<c_type>(
decode_buf_, draws_, num_values_, valid_bits, valid_bits_offset));
}
}
};

TEST_F(TestRleBooleanEncoding, BasicRoundTrip) {
ASSERT_NO_FATAL_FAILURE(this->Execute(0, 0));
ASSERT_NO_FATAL_FAILURE(this->Execute(2000, 200));
ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(
/*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64,
/*null_probability*/ 0.1));
}

TEST_F(TestRleBooleanEncoding, AllNull) {
ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(
/*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64,
/*null_probability*/ 1));
}

// ----------------------------------------------------------------------
// DELTA_LENGTH_BYTE_ARRAY encode/decode tests.

Expand Down

0 comments on commit aa8a118

Please sign in to comment.