diff --git a/dbms/src/DataTypes/DataTypeAggregateFunction.cpp b/dbms/src/DataTypes/DataTypeAggregateFunction.cpp index 0cb5d8b13df..b4e37767ba9 100644 --- a/dbms/src/DataTypes/DataTypeAggregateFunction.cpp +++ b/dbms/src/DataTypes/DataTypeAggregateFunction.cpp @@ -129,7 +129,8 @@ void DataTypeAggregateFunction::deserializeBinaryBulk( IColumn & column, ReadBuffer & istr, size_t limit, - double /*avg_value_size_hint*/) const + double /*avg_value_size_hint*/, + const IColumn::Filter * filter) const { ColumnAggregateFunction & real_column = typeid_cast(column); ColumnAggregateFunction::Container & vec = real_column.getData(); @@ -145,6 +146,9 @@ void DataTypeAggregateFunction::deserializeBinaryBulk( if (istr.eof()) break; + // Actually, there is no need to deserialize each row when filter is provided. + // But since we do not know the size of serialized state, we have to deserialize it. + // TODO: Optimize this. AggregateDataPtr place = arena.alloc(size_of_state); function->create(place); @@ -159,7 +163,10 @@ void DataTypeAggregateFunction::deserializeBinaryBulk( throw; } - vec.push_back(place); + if (filter && !(*filter)[i]) + function->destroy(place); + else + vec.push_back(place); } } diff --git a/dbms/src/DataTypes/DataTypeAggregateFunction.h b/dbms/src/DataTypes/DataTypeAggregateFunction.h index d21db9f485b..21d46428ecb 100644 --- a/dbms/src/DataTypes/DataTypeAggregateFunction.h +++ b/dbms/src/DataTypes/DataTypeAggregateFunction.h @@ -63,8 +63,12 @@ class DataTypeAggregateFunction final : public IDataType void serializeBinary(const IColumn & column, size_t row_num, WriteBuffer & ostr) const override; void deserializeBinary(IColumn & column, ReadBuffer & istr) const override; void serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit) const override; - void deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) - const override; + void deserializeBinaryBulk( + IColumn & column, + ReadBuffer & istr, + size_t limit, + double avg_value_size_hint, + const IColumn::Filter * filter) const override; void serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr) const override; void serializeTextEscaped(const IColumn & column, size_t row_num, WriteBuffer & ostr) const override; void deserializeTextEscaped(IColumn & column, ReadBuffer & istr) const override; diff --git a/dbms/src/DataTypes/DataTypeArray.cpp b/dbms/src/DataTypes/DataTypeArray.cpp index 8bff3789c8a..4dbd52b4fe1 100644 --- a/dbms/src/DataTypes/DataTypeArray.cpp +++ b/dbms/src/DataTypes/DataTypeArray.cpp @@ -223,7 +223,8 @@ void DataTypeArray::deserializeBinaryBulkWithMultipleStreams( size_t limit, double /*avg_value_size_hint*/, bool position_independent_encoding, - SubstreamPath & path) const + SubstreamPath & path, + const IColumn::Filter * filter) const { ColumnArray & column_array = typeid_cast(column); @@ -245,7 +246,7 @@ void DataTypeArray::deserializeBinaryBulkWithMultipleStreams( "try to deserialize Array type to non-empty column without position independent encoding, type_name={}", getName()); DataTypeNumber() - .deserializeBinaryBulk(column_array.getOffsetsColumn(), *stream, limit, 0); + .deserializeBinaryBulk(column_array.getOffsetsColumn(), *stream, limit, 0, filter); } } @@ -269,7 +270,8 @@ void DataTypeArray::deserializeBinaryBulkWithMultipleStreams( nested_limit, 0, position_independent_encoding, - path); + path, + filter); /// Check consistency between offsets and elements subcolumns. /// But if elements column is empty - it's ok for columns of Nested types that was added by ALTER. diff --git a/dbms/src/DataTypes/DataTypeArray.h b/dbms/src/DataTypes/DataTypeArray.h index d6facb36d15..335e66ffde9 100644 --- a/dbms/src/DataTypes/DataTypeArray.h +++ b/dbms/src/DataTypes/DataTypeArray.h @@ -81,7 +81,8 @@ class DataTypeArray final : public IDataType size_t limit, double avg_value_size_hint, bool position_independent_encoding, - SubstreamPath & path) const override; + SubstreamPath & path, + const IColumn::Filter * filter) const override; MutableColumnPtr createColumn() const override; diff --git a/dbms/src/DataTypes/DataTypeDecimal.cpp b/dbms/src/DataTypes/DataTypeDecimal.cpp index 1f5c3a8e27c..0211db7f138 100644 --- a/dbms/src/DataTypes/DataTypeDecimal.cpp +++ b/dbms/src/DataTypes/DataTypeDecimal.cpp @@ -13,6 +13,7 @@ // limitations under the License. #include +#include #include #include #include @@ -79,13 +80,50 @@ void DataTypeDecimal::deserializeBinaryBulk( IColumn & column, ReadBuffer & istr, size_t limit, - double /*avg_value_size_hint*/) const + double /*avg_value_size_hint*/, + const IColumn::Filter * filter) const { - typename ColumnType::Container & x = typeid_cast(column).getData(); - size_t initial_size = x.size(); - x.resize(initial_size + limit); - size_t size = istr.readBig(reinterpret_cast(&x[initial_size]), sizeof(FieldType) * limit); - x.resize(initial_size + size / sizeof(FieldType)); + auto & x = typeid_cast(column).getData(); + size_t current_size = x.size(); + if (!filter) + { + x.resize(current_size + limit); + size_t size = istr.readBig(reinterpret_cast(&x[current_size]), sizeof(FieldType) * limit); + x.resize(current_size + size / sizeof(FieldType)); + return; + } + + const size_t passed = countBytesInFilter(filter->data(), limit); + x.resize(current_size + passed); + UInt8 prev = (*filter)[0]; + size_t count = 1; + for (size_t i = 1; i < limit; ++i) + { + bool break_point = ((*filter)[i] != prev); + if (break_point && prev) + { + size_t size = istr.read(reinterpret_cast(&x[current_size]), sizeof(FieldType) * count); + current_size += size / sizeof(FieldType); + count = 1; + } + else if (break_point && !prev) + { + istr.ignore(sizeof(FieldType) * count); + count = 1; + } + else + { + ++count; + } + prev = (*filter)[i]; + } + if (prev) + { + size_t size = istr.read(reinterpret_cast(&x[current_size]), sizeof(FieldType) * count); + current_size += size / sizeof(FieldType); + } + + x.resize(current_size); } template diff --git a/dbms/src/DataTypes/DataTypeDecimal.h b/dbms/src/DataTypes/DataTypeDecimal.h index f155faaea23..e0b9d4d4642 100644 --- a/dbms/src/DataTypes/DataTypeDecimal.h +++ b/dbms/src/DataTypes/DataTypeDecimal.h @@ -138,8 +138,12 @@ class DataTypeDecimal final : public IDataType void deserializeBinary(IColumn & column, ReadBuffer & istr) const override; void serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit) const override; - void deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) - const override; + void deserializeBinaryBulk( + IColumn & column, + ReadBuffer & istr, + size_t limit, + double avg_value_size_hint, + const IColumn::Filter * filter) const override; void serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr) const override; diff --git a/dbms/src/DataTypes/DataTypeEnum.cpp b/dbms/src/DataTypes/DataTypeEnum.cpp index 46413140315..dd941b067fd 100644 --- a/dbms/src/DataTypes/DataTypeEnum.cpp +++ b/dbms/src/DataTypes/DataTypeEnum.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -225,13 +226,50 @@ void DataTypeEnum::deserializeBinaryBulk( IColumn & column, ReadBuffer & istr, size_t limit, - double /*avg_value_size_hint*/) const + double /*avg_value_size_hint*/, + const IColumn::Filter * filter) const { auto & x = typeid_cast(column).getData(); - const auto initial_size = x.size(); - x.resize(initial_size + limit); - const auto size = istr.readBig(reinterpret_cast(&x[initial_size]), sizeof(FieldType) * limit); - x.resize(initial_size + size / sizeof(FieldType)); + size_t current_size = x.size(); + if (!filter) + { + x.resize(current_size + limit); + size_t size = istr.readBig(reinterpret_cast(&x[current_size]), sizeof(FieldType) * limit); + x.resize(current_size + size / sizeof(FieldType)); + return; + } + + const size_t passed = countBytesInFilter(filter->data(), limit); + x.resize(current_size + passed); + UInt8 prev = (*filter)[0]; + size_t count = 1; + for (size_t i = 1; i < limit; ++i) + { + bool break_point = ((*filter)[i] != prev); + if (break_point && prev) + { + size_t size = istr.read(reinterpret_cast(&x[current_size]), sizeof(FieldType) * count); + current_size += size / sizeof(FieldType); + count = 1; + } + else if (break_point && !prev) + { + istr.ignore(sizeof(FieldType) * count); + count = 1; + } + else + { + ++count; + } + prev = (*filter)[i]; + } + if (prev) + { + size_t size = istr.read(reinterpret_cast(&x[current_size]), sizeof(FieldType) * count); + current_size += size / sizeof(FieldType); + } + + x.resize(current_size); } template diff --git a/dbms/src/DataTypes/DataTypeEnum.h b/dbms/src/DataTypes/DataTypeEnum.h index d409736afd7..aa2f630d2e2 100644 --- a/dbms/src/DataTypes/DataTypeEnum.h +++ b/dbms/src/DataTypes/DataTypeEnum.h @@ -131,8 +131,12 @@ class DataTypeEnum final : public IDataTypeEnum void deserializeTextJSON(IColumn & column, ReadBuffer & istr) const override; void serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit) const override; - void deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) - const override; + void deserializeBinaryBulk( + IColumn & column, + ReadBuffer & istr, + size_t limit, + double avg_value_size_hint, + const IColumn::Filter * filter) const override; MutableColumnPtr createColumn() const override { return ColumnType::create(); } diff --git a/dbms/src/DataTypes/DataTypeFixedString.cpp b/dbms/src/DataTypes/DataTypeFixedString.cpp index fbbab054fc8..21a20368804 100644 --- a/dbms/src/DataTypes/DataTypeFixedString.cpp +++ b/dbms/src/DataTypes/DataTypeFixedString.cpp @@ -14,6 +14,7 @@ #include #include +#include #include #include #include @@ -105,19 +106,54 @@ void DataTypeFixedString::deserializeBinaryBulk( IColumn & column, ReadBuffer & istr, size_t limit, - double /*avg_value_size_hint*/) const + double /*avg_value_size_hint*/, + const IColumn::Filter * filter) const { - ColumnFixedString::Chars_t & data = typeid_cast(column).getChars(); + auto & data = typeid_cast(column).getChars(); + size_t current_size = data.size(); + if (!filter) + { + data.resize(current_size + limit * n); + size_t size = istr.readBig(reinterpret_cast(&data[current_size]), n * limit); - size_t initial_size = data.size(); - size_t max_bytes = limit * n; - data.resize(initial_size + max_bytes); - size_t read_bytes = istr.readBig(reinterpret_cast(&data[initial_size]), max_bytes); + if (size % n != 0) + throw Exception("Cannot read all data of type FixedString", ErrorCodes::CANNOT_READ_ALL_DATA); + + data.resize(current_size + size); + return; + } - if (read_bytes % n != 0) - throw Exception("Cannot read all data of type FixedString", ErrorCodes::CANNOT_READ_ALL_DATA); + const size_t passed = countBytesInFilter(filter->data(), limit); + data.resize(current_size + passed * n); + UInt8 prev = (*filter)[0]; + size_t count = 1; + for (size_t i = 1; i < limit; ++i) + { + bool break_point = ((*filter)[i] != prev); + if (break_point && prev) + { + size_t size = istr.read(reinterpret_cast(&data[current_size]), n * count); + current_size += size; + count = 1; + } + else if (break_point && !prev) + { + istr.ignore(n * count); + count = 1; + } + else + { + ++count; + } + prev = (*filter)[i]; + } + if (prev) + { + size_t size = istr.read(reinterpret_cast(&data[current_size]), n * count); + current_size += size; + } - data.resize(initial_size + read_bytes); + data.resize(current_size); } diff --git a/dbms/src/DataTypes/DataTypeFixedString.h b/dbms/src/DataTypes/DataTypeFixedString.h index 243d228a8e1..d8c4d8d2a14 100644 --- a/dbms/src/DataTypes/DataTypeFixedString.h +++ b/dbms/src/DataTypes/DataTypeFixedString.h @@ -58,8 +58,12 @@ class DataTypeFixedString final : public IDataType void deserializeBinary(IColumn & column, ReadBuffer & istr) const override; void serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit) const override; - void deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) - const override; + void deserializeBinaryBulk( + IColumn & column, + ReadBuffer & istr, + size_t limit, + double avg_value_size_hint, + const IColumn::Filter * filter) const override; void serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr) const override; diff --git a/dbms/src/DataTypes/DataTypeNothing.cpp b/dbms/src/DataTypes/DataTypeNothing.cpp index 57908e07b3e..e48159802c6 100644 --- a/dbms/src/DataTypes/DataTypeNothing.cpp +++ b/dbms/src/DataTypes/DataTypeNothing.cpp @@ -42,7 +42,8 @@ void DataTypeNothing::deserializeBinaryBulk( IColumn & column, ReadBuffer & istr, size_t limit, - double /*avg_value_size_hint*/) const + double, + const IColumn::Filter *) const { typeid_cast(column).addSize(istr.tryIgnore(limit)); } diff --git a/dbms/src/DataTypes/DataTypeNothing.h b/dbms/src/DataTypes/DataTypeNothing.h index 195e882d916..89397e7af3d 100644 --- a/dbms/src/DataTypes/DataTypeNothing.h +++ b/dbms/src/DataTypes/DataTypeNothing.h @@ -36,7 +36,7 @@ class DataTypeNothing final : public IDataTypeDummy /// These methods read and write zero bytes just to allow to figure out size of column. void serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit) const override; - void deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) + void deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, size_t limit, double, const IColumn::Filter *) const override; bool equals(const IDataType & rhs) const override; diff --git a/dbms/src/DataTypes/DataTypeNullable.cpp b/dbms/src/DataTypes/DataTypeNullable.cpp index da504d03382..b82c34a811a 100644 --- a/dbms/src/DataTypes/DataTypeNullable.cpp +++ b/dbms/src/DataTypes/DataTypeNullable.cpp @@ -95,13 +95,14 @@ void DataTypeNullable::deserializeBinaryBulkWithMultipleStreams( size_t limit, double avg_value_size_hint, bool position_independent_encoding, - SubstreamPath & path) const + SubstreamPath & path, + const IColumn::Filter * filter) const { auto & col = static_cast(column); path.emplace_back(Substream::NullMap); if (auto * stream = getter(path)) - DataTypeUInt8().deserializeBinaryBulk(col.getNullMapColumn(), *stream, limit, 0); + DataTypeUInt8().deserializeBinaryBulk(col.getNullMapColumn(), *stream, limit, 0, filter); path.back() = Substream::NullableElements; nested_data_type->deserializeBinaryBulkWithMultipleStreams( @@ -110,7 +111,8 @@ void DataTypeNullable::deserializeBinaryBulkWithMultipleStreams( limit, avg_value_size_hint, position_independent_encoding, - path); + path, + filter); } diff --git a/dbms/src/DataTypes/DataTypeNullable.h b/dbms/src/DataTypes/DataTypeNullable.h index 61f44ef5884..7f69c26ee7e 100644 --- a/dbms/src/DataTypes/DataTypeNullable.h +++ b/dbms/src/DataTypes/DataTypeNullable.h @@ -48,7 +48,8 @@ class DataTypeNullable final : public IDataType size_t limit, double avg_value_size_hint, bool position_independent_encoding, - SubstreamPath & path) const override; + SubstreamPath & path, + const IColumn::Filter * filter) const override; void serializeBinary(const Field & field, WriteBuffer & ostr) const override { diff --git a/dbms/src/DataTypes/DataTypeNumberBase.cpp b/dbms/src/DataTypes/DataTypeNumberBase.cpp index 123141521e0..53ec699ed76 100644 --- a/dbms/src/DataTypes/DataTypeNumberBase.cpp +++ b/dbms/src/DataTypes/DataTypeNumberBase.cpp @@ -14,6 +14,7 @@ #include #include +#include #include #include #include @@ -233,15 +234,51 @@ void DataTypeNumberBase::deserializeBinaryBulk( IColumn & column, ReadBuffer & istr, size_t limit, - double /*avg_value_size_hint*/) const + double /*avg_value_size_hint*/, + const IColumn::Filter * filter) const { - typename ColumnVector::Container & x = typeid_cast &>(column).getData(); - size_t initial_size = x.size(); - x.resize(initial_size + limit); - size_t size = istr.readBig( - reinterpret_cast(&x[initial_size]), - sizeof(typename ColumnVector::value_type) * limit); - x.resize(initial_size + size / sizeof(typename ColumnVector::value_type)); + auto & x = typeid_cast &>(column).getData(); + size_t current_size = x.size(); + constexpr auto field_size = sizeof(typename ColumnVector::value_type); + if (!filter) + { + x.resize(current_size + limit); + size_t size = istr.readBig(reinterpret_cast(&x[current_size]), field_size * limit); + x.resize(current_size + size / field_size); + return; + } + + const size_t passed = countBytesInFilter(filter->data(), limit); + x.resize(current_size + passed); + UInt8 prev = (*filter)[0]; + size_t count = 1; + for (size_t i = 1; i < limit; ++i) + { + bool break_point = ((*filter)[i] != prev); + if (break_point && prev) + { + size_t size = istr.read(reinterpret_cast(&x[current_size]), field_size * count); + current_size += size / field_size; + count = 1; + } + else if (break_point && !prev) + { + istr.ignore(field_size * count); + count = 1; + } + else + { + ++count; + } + prev = (*filter)[i]; + } + if (prev) + { + size_t size = istr.read(reinterpret_cast(&x[current_size]), field_size * count); + current_size += size / field_size; + } + + x.resize(current_size); } template diff --git a/dbms/src/DataTypes/DataTypeNumberBase.h b/dbms/src/DataTypes/DataTypeNumberBase.h index 090d304f9b7..6570e739539 100644 --- a/dbms/src/DataTypes/DataTypeNumberBase.h +++ b/dbms/src/DataTypes/DataTypeNumberBase.h @@ -52,8 +52,12 @@ class DataTypeNumberBase : public IDataType void serializeBinary(const IColumn & column, size_t row_num, WriteBuffer & ostr) const override; void deserializeBinary(IColumn & column, ReadBuffer & istr) const override; void serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit) const override; - void deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) - const override; + void deserializeBinaryBulk( + IColumn & column, + ReadBuffer & istr, + size_t limit, + double avg_value_size_hint, + const IColumn::Filter * filter) const override; MutableColumnPtr createColumn() const override; diff --git a/dbms/src/DataTypes/DataTypeString.cpp b/dbms/src/DataTypes/DataTypeString.cpp index 90f8e7f80ba..9cb30c04635 100644 --- a/dbms/src/DataTypes/DataTypeString.cpp +++ b/dbms/src/DataTypes/DataTypeString.cpp @@ -14,6 +14,7 @@ #include #include +#include #include #include #include @@ -120,7 +121,8 @@ static NO_INLINE void deserializeBinarySSE2( ColumnString::Chars_t & data, ColumnString::Offsets & offsets, ReadBuffer & istr, - size_t limit) + size_t limit, + const IColumn::Filter * filter) { size_t offset = data.size(); for (size_t i = 0; i < limit; ++i) @@ -131,6 +133,12 @@ static NO_INLINE void deserializeBinarySSE2( UInt64 size; readVarUInt(size, istr); + if (filter && !(*filter)[i]) + { + istr.ignore(size); + continue; + } + offset += size + 1; offsets.push_back(offset); @@ -175,7 +183,8 @@ void DataTypeString::deserializeBinaryBulk( IColumn & column, ReadBuffer & istr, size_t limit, - double avg_value_size_hint) const + double avg_value_size_hint, + const IColumn::Filter * filter) const { ColumnString & column_string = typeid_cast(column); ColumnString::Chars_t & data = column_string.getChars(); @@ -191,19 +200,20 @@ void DataTypeString::deserializeBinaryBulk( avg_chars_size = (avg_value_size_hint - sizeof(offsets[0])) * avg_value_size_hint_reserve_multiplier; } - size_t size_to_reserve = data.size() + static_cast(std::ceil(limit * avg_chars_size)); + const size_t passed = filter ? countBytesInFilter(filter->data(), limit) : limit; + size_t size_to_reserve = data.size() + static_cast(std::ceil(passed * avg_chars_size)); data.reserve(size_to_reserve); - offsets.reserve(offsets.size() + limit); + offsets.reserve(offsets.size() + passed); if (avg_chars_size >= 64) - deserializeBinarySSE2<4>(data, offsets, istr, limit); + deserializeBinarySSE2<4>(data, offsets, istr, limit, filter); else if (avg_chars_size >= 48) - deserializeBinarySSE2<3>(data, offsets, istr, limit); + deserializeBinarySSE2<3>(data, offsets, istr, limit, filter); else if (avg_chars_size >= 32) - deserializeBinarySSE2<2>(data, offsets, istr, limit); + deserializeBinarySSE2<2>(data, offsets, istr, limit, filter); else - deserializeBinarySSE2<1>(data, offsets, istr, limit); + deserializeBinarySSE2<1>(data, offsets, istr, limit, filter); } diff --git a/dbms/src/DataTypes/DataTypeString.h b/dbms/src/DataTypes/DataTypeString.h index 1bc4ece42dd..877475adb33 100644 --- a/dbms/src/DataTypes/DataTypeString.h +++ b/dbms/src/DataTypes/DataTypeString.h @@ -35,8 +35,12 @@ class DataTypeString final : public IDataType void deserializeBinary(IColumn & column, ReadBuffer & istr) const override; void serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit) const override; - void deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) - const override; + void deserializeBinaryBulk( + IColumn & column, + ReadBuffer & istr, + size_t limit, + double avg_value_size_hint, + const IColumn::Filter * filter) const override; void serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr) const override; diff --git a/dbms/src/DataTypes/DataTypeTuple.cpp b/dbms/src/DataTypes/DataTypeTuple.cpp index 63c5b3f45aa..2399a97a4ef 100644 --- a/dbms/src/DataTypes/DataTypeTuple.cpp +++ b/dbms/src/DataTypes/DataTypeTuple.cpp @@ -300,7 +300,8 @@ void DataTypeTuple::deserializeBinaryBulkWithMultipleStreams( size_t limit, double avg_value_size_hint, bool position_independent_encoding, - SubstreamPath & path) const + SubstreamPath & path, + const IColumn::Filter * filter) const { path.emplace_back(Substream::TupleElement); for (const auto i : ext::range(0, ext::size(elems))) @@ -312,7 +313,8 @@ void DataTypeTuple::deserializeBinaryBulkWithMultipleStreams( limit, avg_value_size_hint, position_independent_encoding, - path); + path, + filter); } } diff --git a/dbms/src/DataTypes/DataTypeTuple.h b/dbms/src/DataTypes/DataTypeTuple.h index 49194e4c6b8..7ca588f138a 100644 --- a/dbms/src/DataTypes/DataTypeTuple.h +++ b/dbms/src/DataTypes/DataTypeTuple.h @@ -84,7 +84,8 @@ class DataTypeTuple final : public IDataType size_t limit, double avg_value_size_hint, bool position_independent_encoding, - SubstreamPath & path) const override; + SubstreamPath & path, + const IColumn::Filter * filter) const override; MutableColumnPtr createColumn() const override; diff --git a/dbms/src/DataTypes/IDataType.cpp b/dbms/src/DataTypes/IDataType.cpp index b131adca00a..ea111577c12 100644 --- a/dbms/src/DataTypes/IDataType.cpp +++ b/dbms/src/DataTypes/IDataType.cpp @@ -68,7 +68,7 @@ void IDataType::serializeBinaryBulk(const IColumn &, WriteBuffer &, size_t, size ErrorCodes::MULTIPLE_STREAMS_REQUIRED); } -void IDataType::deserializeBinaryBulk(IColumn &, ReadBuffer &, size_t, double) const +void IDataType::deserializeBinaryBulk(IColumn &, ReadBuffer &, size_t, double, const IColumn::Filter *) const { throw Exception( "Data type " + getName() + " must be deserialized with multiple streams", diff --git a/dbms/src/DataTypes/IDataType.h b/dbms/src/DataTypes/IDataType.h index b9540aee8f4..f522fde7080 100644 --- a/dbms/src/DataTypes/IDataType.h +++ b/dbms/src/DataTypes/IDataType.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include @@ -29,7 +30,6 @@ class WriteBuffer; class IDataType; struct FormatSettingsJSON; -class IColumn; using ColumnPtr = COWPtr::Ptr; using MutableColumnPtr = COWPtr::MutablePtr; @@ -154,6 +154,7 @@ class IDataType : private boost::noncopyable * `avg_value_size_hint` - if not zero, may be used to avoid reallocations while reading column of String type. * `position_independent_encoding` - provide better performance when it is false, but it requires not to be * deserialized the data into a column with existing data. + * `filter` - if not nullptr, then read only values corresponding to 1 in the filter. */ virtual void deserializeBinaryBulkWithMultipleStreams( IColumn & column, @@ -161,10 +162,11 @@ class IDataType : private boost::noncopyable size_t limit, double avg_value_size_hint, bool /*position_independent_encoding*/, - SubstreamPath & path) const + SubstreamPath & path, + const IColumn::Filter * filter) const { if (ReadBuffer * stream = getter(path)) - deserializeBinaryBulk(column, *stream, limit, avg_value_size_hint); + deserializeBinaryBulk(column, *stream, limit, avg_value_size_hint, filter); } void deserializeBinaryBulkWithMultipleStreams( @@ -173,7 +175,8 @@ class IDataType : private boost::noncopyable size_t limit, double avg_value_size_hint, bool position_independent_encoding, - SubstreamPath && path) const + SubstreamPath && path, + const IColumn::Filter * filter = nullptr) const { deserializeBinaryBulkWithMultipleStreams( column, @@ -181,14 +184,19 @@ class IDataType : private boost::noncopyable limit, avg_value_size_hint, position_independent_encoding, - path); + path, + filter); } /** Override these methods for data types that require just single stream (most of data types). */ virtual void serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit) const; - virtual void deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) - const; + virtual void deserializeBinaryBulk( + IColumn & column, + ReadBuffer & istr, + size_t limit, + double avg_value_size_hint, + const IColumn::Filter * filter) const; /** Serialization/deserialization of individual values. * diff --git a/dbms/src/DataTypes/IDataTypeDummy.h b/dbms/src/DataTypes/IDataTypeDummy.h index 7f1371acb70..53f95cfe661 100644 --- a/dbms/src/DataTypes/IDataTypeDummy.h +++ b/dbms/src/DataTypes/IDataTypeDummy.h @@ -38,7 +38,10 @@ class IDataTypeDummy : public IDataType void serializeBinary(const IColumn &, size_t, WriteBuffer &) const override { throwNoSerialization(); } void deserializeBinary(IColumn &, ReadBuffer &) const override { throwNoSerialization(); } void serializeBinaryBulk(const IColumn &, WriteBuffer &, size_t, size_t) const override { throwNoSerialization(); } - void deserializeBinaryBulk(IColumn &, ReadBuffer &, size_t, double) const override { throwNoSerialization(); } + void deserializeBinaryBulk(IColumn &, ReadBuffer &, size_t, double, const IColumn::Filter *) const override + { + throwNoSerialization(); + } void serializeText(const IColumn &, size_t, WriteBuffer &) const override { throwNoSerialization(); } void serializeTextEscaped(const IColumn &, size_t, WriteBuffer &) const override { throwNoSerialization(); } void deserializeTextEscaped(IColumn &, ReadBuffer &) const override { throwNoSerialization(); } diff --git a/dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilter.h b/dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilter.h index e060b22e1b0..eb5154bd231 100644 --- a/dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilter.h +++ b/dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilter.h @@ -50,7 +50,7 @@ class BitmapFilter private: void set(std::span row_ids, const FilterPtr & f); - std::vector filter; + IColumn::Filter filter; bool all_match; }; diff --git a/dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilterBlockInputStream.cpp b/dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilterBlockInputStream.cpp index 2ee899a6097..7aa6db15dab 100644 --- a/dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilterBlockInputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilterBlockInputStream.cpp @@ -32,40 +32,18 @@ BitmapFilterBlockInputStream::BitmapFilterBlockInputStream( Block BitmapFilterBlockInputStream::read() { - FilterPtr block_filter = nullptr; - auto block = children.at(0)->read(block_filter, true); + auto block = children.at(0)->read(); if (!block) return block; filter.resize(block.rows()); - bool all_match = bitmap_filter->get(filter, block.startOffset(), block.rows()); - if (!block_filter) - { - if (all_match) - return block; - size_t passed_count = countBytesInFilter(filter); - for (auto & col : block) - { - col.column = col.column->filter(filter, passed_count); - } - } - else + if (bool all_match = bitmap_filter->get(filter, block.startOffset(), block.rows()); all_match) + return block; + + size_t passed_count = countBytesInFilter(filter); + for (auto & col : block) { - RUNTIME_CHECK(filter.size() == block_filter->size(), filter.size(), block_filter->size()); - if (!all_match) - { - std::transform( - filter.begin(), - filter.end(), - block_filter->begin(), - block_filter->begin(), - [](UInt8 a, UInt8 b) { return a && b; }); - } - size_t passed_count = countBytesInFilter(*block_filter); - for (auto & col : block) - { - col.column = col.column->filter(*block_filter, passed_count); - } + col.column = col.column->filter(filter, passed_count); } return block; } diff --git a/dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilterView.h b/dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilterView.h index 91122e39ab8..a7ae3d8220c 100644 --- a/dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilterView.h +++ b/dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilterView.h @@ -52,6 +52,14 @@ class BitmapFilterView return BitmapFilterView(filter, filter_offset + offset, size); } + IColumn::Filter createRawSubView(UInt32 offset, UInt32 size) const + { + RUNTIME_CHECK(offset + size <= filter_size, offset, size, filter_size); + return IColumn::Filter{ + filter->filter.data() + filter_offset + offset, + filter->filter.data() + filter_offset + offset + size}; + } + // Caller should ensure n in [0, size). inline bool get(UInt32 n) const { return filter->get(filter_offset + n); } diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.h index 0142e343a46..851708978d0 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.h +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.h @@ -178,6 +178,9 @@ class ColumnFileReader /// This method used to skip next block. virtual size_t skipNextBlock() = 0; + /// This method is used to read raw data, but only return the rows that pass the filter. + virtual Block readWithFilter(const IColumn::Filter & filter) = 0; + /// Create a new reader from current reader with different columns to read. virtual ColumnFileReaderPtr createNewReader(const ColumnDefinesPtr & col_defs, ReadTag read_tag) = 0; diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp index 64469af3f72..4cbfcea364b 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp @@ -363,6 +363,29 @@ Block ColumnFileBigReader::readNextBlock() } } +Block ColumnFileBigReader::readWithFilter(const IColumn::Filter & filter) +{ + initStream(); + + if (pk_ver_only) + { + if (next_block_index_in_cache >= cached_pk_ver_columns.size()) + { + return {}; + } + auto & columns = cached_pk_ver_columns[next_block_index_in_cache]; + size_t passed_count = countBytesInFilter(filter); + for (auto & col : columns) + col = col->filter(filter, passed_count); + next_block_index_in_cache += 1; + return header.cloneWithColumns(std::move(columns)); + } + else + { + return file_stream->readWithFilter(filter); + } +} + size_t ColumnFileBigReader::skipNextBlock() { initStream(); diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.h index eb9f6f90f01..862cce3a647 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.h +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.h @@ -201,6 +201,8 @@ class ColumnFileBigReader : public ColumnFileReader Block readNextBlock() override; + Block readWithFilter(const IColumn::Filter & filter) override; + size_t skipNextBlock() override; ColumnFileReaderPtr createNewReader(const ColumnDefinesPtr & new_col_defs, ReadTag) override; diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.cpp index 515a4a3c705..b625ade4f18 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -139,6 +140,18 @@ Block ColumnFileInMemoryReader::readNextBlock() return genBlock(*col_defs, columns); } +Block ColumnFileInMemoryReader::readWithFilter(const IColumn::Filter & filter) +{ + auto block = readNextBlock(); + if (size_t passed_count = countBytesInFilter(filter); passed_count != block.rows()) + { + for (auto & col : block) + { + col.column = col.column->filter(filter, passed_count); + } + } + return block; +} size_t ColumnFileInMemoryReader::skipNextBlock() { diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.h index 5728ef585d6..e26f1078b42 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.h +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.h @@ -138,6 +138,8 @@ class ColumnFileInMemoryReader : public ColumnFileReader Block readNextBlock() override; + Block readWithFilter(const IColumn::Filter & filter) override; + size_t skipNextBlock() override; ColumnFileReaderPtr createNewReader(const ColumnDefinesPtr & new_col_defs, ReadTag) override; diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.cpp index 3a81960df6b..9a9d49c6848 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.cpp @@ -121,7 +121,12 @@ void serializeColumn( compressed.next(); } -void deserializeColumn(IColumn & column, const DataTypePtr & type, std::string_view data_buf, size_t rows) +void deserializeColumn( + IColumn & column, + const DataTypePtr & type, + std::string_view data_buf, + size_t rows, + const IColumn::Filter * filter) { ReadBufferFromString buf(data_buf); CompressedReadBuffer compressed(buf); @@ -131,7 +136,8 @@ void deserializeColumn(IColumn & column, const DataTypePtr & type, std::string_v rows, static_cast(data_buf.size()) / rows, true, - {}); + {}, + filter); } void serializeSavedColumnFiles(WriteBuffer & buf, const ColumnFilePersisteds & column_files) diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.h index 8ee3f2107ed..c33691a6e6b 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.h +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.h @@ -56,7 +56,12 @@ void serializeColumn( size_t limit, CompressionMethod compression_method, Int64 compression_level); -void deserializeColumn(IColumn & column, const DataTypePtr & type, std::string_view data_buf, size_t rows); +void deserializeColumn( + IColumn & column, + const DataTypePtr & type, + std::string_view data_buf, + size_t rows, + const IColumn::Filter * filter = nullptr); /// Serialize those column files' metadata into buf. void serializeSavedColumnFiles(WriteBuffer & buf, const ColumnFilePersisteds & column_files); diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetInputStream.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetInputStream.cpp index 78b3fd10b29..9c9506079a7 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetInputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetInputStream.cpp @@ -67,15 +67,27 @@ Block ColumnFileSetInputStream::read(FilterPtr & res_filter, bool) Block ColumnFileSetInputStream::readWithFilter(const IColumn::Filter & filter) { - auto block = read(); - if (size_t passed_count = countBytesInFilter(filter); passed_count != block.rows()) + while (cur_column_file_reader != reader.column_file_readers.end()) { - for (auto & col : block) + if (*cur_column_file_reader == nullptr) { - col.column = col.column->filter(filter, passed_count); + ++cur_column_file_reader; + continue; + } + auto block = (*cur_column_file_reader)->readWithFilter(filter); + if (block) + { + block.setStartOffset(read_rows); + read_rows += filter.size(); + return block; + } + else + { + (*cur_column_file_reader).reset(); + ++cur_column_file_reader; } } - return block; + return {}; } } // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetWithVectorIndexInputStream.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetWithVectorIndexInputStream.cpp index fe736ef1f99..94b4fb2efff 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetWithVectorIndexInputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetWithVectorIndexInputStream.cpp @@ -50,10 +50,23 @@ SkippableBlockInputStreamPtr ColumnFileSetWithVectorIndexInputStream::tryBuild( rest_columns->emplace_back(cd); } - // No vector index column is specified, just use the normal logic. + // No vector index column is specified, fallback. if (!vec_cd.has_value()) return fallback(); + bool has_vector_index = false; + for (const auto & file : delta_snap->getColumnFiles()) + { + if (auto * tiny_file = file->tryToTinyFile(); tiny_file && tiny_file->hasIndex(ann_query_info->index_id())) + { + has_vector_index = true; + break; + } + } + // No file has vector index, fallback. + if (!has_vector_index) + return fallback(); + // All check passed. Let's read via vector index. return std::make_shared( context, @@ -68,29 +81,10 @@ SkippableBlockInputStreamPtr ColumnFileSetWithVectorIndexInputStream::tryBuild( read_tag_); } -Block ColumnFileSetWithVectorIndexInputStream::read(FilterPtr & res_filter, bool return_filter) -{ - if (return_filter) - return readImpl(res_filter); - - // If return_filter == false, we must filter by ourselves. - - FilterPtr filter = nullptr; - auto res = readImpl(filter); - if (filter != nullptr) - { - auto passed_count = countBytesInFilter(*filter); - for (auto & col : res) - col.column = col.column->filter(*filter, passed_count); - } - // filter == nullptr means all rows are valid and no need to filter. - return res; -} - -Block ColumnFileSetWithVectorIndexInputStream::readOtherColumns() +Block ColumnFileSetWithVectorIndexInputStream::readOtherColumns(const IColumn::Filter & filter) { auto reset_column_file_reader = (*cur_column_file_reader)->createNewReader(rest_col_defs, ReadTag::Query); - Block block = reset_column_file_reader->readNextBlock(); + Block block = reset_column_file_reader->readWithFilter(filter); return block; } @@ -102,7 +96,7 @@ void ColumnFileSetWithVectorIndexInputStream::toNextFile(size_t current_file_ind tiny_readers[current_file_index].reset(); } -Block ColumnFileSetWithVectorIndexInputStream::readImpl(FilterPtr & res_filter) +Block ColumnFileSetWithVectorIndexInputStream::read() { load(); @@ -115,10 +109,10 @@ Block ColumnFileSetWithVectorIndexInputStream::readImpl(FilterPtr & res_filter) continue; } auto current_file_index = std::distance(reader.column_file_readers.begin(), cur_column_file_reader); - // If has index, we can read the column by vector index. + const auto file_rows = column_files[current_file_index]->getRows(); + // If file has index, we can read the column by vector index. if (tiny_readers[current_file_index] != nullptr) { - const auto file_rows = column_files[current_file_index]->getRows(); auto selected_row_begin = std::lower_bound(sorted_results.cbegin(), sorted_results.cend(), read_rows); auto selected_row_end = std::lower_bound(selected_row_begin, sorted_results.cend(), read_rows + file_rows); size_t selected_rows = std::distance(selected_row_begin, selected_row_end); @@ -133,44 +127,36 @@ Block ColumnFileSetWithVectorIndexInputStream::readImpl(FilterPtr & res_filter) auto tiny_reader = tiny_readers[current_file_index]; auto vec_column = vec_cd.type->createColumn(); const std::span file_selected_rows{selected_row_begin, selected_row_end}; - tiny_reader->read(vec_column, file_selected_rows, /* rowid_start_offset= */ read_rows, file_rows); - assert(vec_column->size() == file_rows); + tiny_reader->read(vec_column, file_selected_rows, /* rowid_start_offset= */ read_rows); + assert(vec_column->size() == selected_rows); + // read other columns if needed Block block; if (!rest_col_defs->empty()) - { - block = readOtherColumns(); - assert(block.rows() == vec_column->size()); - } - - auto index = header.getPositionByName(vec_cd.name); - block.insert(index, ColumnWithTypeAndName(std::move(vec_column), vec_cd.type, vec_cd.name)); - - // Fill res_filter - if (selected_rows == file_rows) - { - res_filter = nullptr; - } - else { filter.clear(); filter.resize_fill(file_rows, 0); for (const auto rowid : file_selected_rows) filter[rowid - read_rows] = 1; - res_filter = &filter; + block = readOtherColumns(filter); + assert(block.rows() == selected_rows); } + auto index = header.getPositionByName(vec_cd.name); + block.insert(index, ColumnWithTypeAndName(std::move(vec_column), vec_cd.type, vec_cd.name)); + // All rows in this ColumnFileTiny have been read. block.setStartOffset(read_rows); toNextFile(current_file_index, file_rows); return block; } - auto block = (*cur_column_file_reader)->readNextBlock(); + // If file does not have index, read all valid rows in the file. + filter = valid_rows.createRawSubView(read_rows, file_rows); + auto block = (*cur_column_file_reader)->readWithFilter(filter); if (block) { block.setStartOffset(read_rows); - read_rows += block.rows(); - res_filter = nullptr; + read_rows += file_rows; return block; } else diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetWithVectorIndexInputStream.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetWithVectorIndexInputStream.h index 905200f62b7..7ea836aff1d 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetWithVectorIndexInputStream.h +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetWithVectorIndexInputStream.h @@ -92,25 +92,14 @@ class ColumnFileSetWithVectorIndexInputStream : public VectorIndexBlockInputStre String getName() const override { return "ColumnFileSetWithVectorIndex"; } Block getHeader() const override { return header; } - Block read() override - { - FilterPtr filter = nullptr; - return read(filter, false); - } - - // When all rows in block are not filtered out, - // `res_filter` will be set to null. - // The caller needs to do handle this situation. - Block read(FilterPtr & res_filter, bool return_filter) override; + Block read() override; std::vector load() override; void setSelectedRows(const std::span & selected_rows) override; private: - Block readImpl(FilterPtr & res_filter); - - Block readOtherColumns(); + Block readOtherColumns(const IColumn::Filter & filter); void toNextFile(size_t current_file_index, size_t current_file_rows); }; diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTinyReader.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTinyReader.cpp index bd0c587f6fc..1e87ac15d3d 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTinyReader.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTinyReader.cpp @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. + +#include #include #include #include @@ -26,7 +28,8 @@ std::pair ColumnFileTinyReader::getPKAndVersionColumns() { if (const size_t cached_columns = cols_data_cache.size(); cached_columns < 2) { - auto columns = readFromDisk(data_provider, {(*col_defs).begin() + cached_columns, (*col_defs).begin() + 2}); + auto columns + = readFromDisk(data_provider, {(*col_defs).begin() + cached_columns, (*col_defs).begin() + 2}, nullptr); cols_data_cache.insert(cols_data_cache.end(), columns.begin(), columns.end()); } @@ -43,8 +46,10 @@ std::pair ColumnFileTinyReader::readRows( const size_t read_columns = output_cols.size(); if (cached_columns < read_columns) { - auto columns - = readFromDisk(data_provider, {(*col_defs).begin() + cached_columns, (*col_defs).begin() + read_columns}); + auto columns = readFromDisk( + data_provider, + {(*col_defs).begin() + cached_columns, (*col_defs).begin() + read_columns}, + nullptr); cols_data_cache.insert(cols_data_cache.end(), columns.begin(), columns.end()); } @@ -54,11 +59,14 @@ std::pair ColumnFileTinyReader::readRows( Columns ColumnFileTinyReader::readFromDisk( const IColumnFileDataProviderPtr & data_provider, - const std::span & column_defines) const + const std::span & column_defines, + const IColumn::Filter * filter) const { const size_t num_columns_read = column_defines.size(); Columns columns(num_columns_read); // allocate empty columns + const size_t passed_count = filter ? countBytesInFilter(filter->data(), tiny_file.rows) : tiny_file.rows; + std::vector fields; const auto & colid_to_offset = tiny_file.schema->getColIdToOffset(); for (size_t index = 0; index < num_columns_read; ++index) @@ -72,7 +80,7 @@ Columns ColumnFileTinyReader::readFromDisk( else { // New column after ddl is not exist in this CFTiny, fill with default value - columns[index] = createColumnWithDefaultValue(cd, tiny_file.rows); + columns[index] = createColumnWithDefaultValue(cd, passed_count); } } @@ -105,7 +113,7 @@ Columns ColumnFileTinyReader::readFromDisk( // Deserialize column by pack's schema const auto & type = tiny_file.getDataType(cd.id); auto col_data = type->createColumn(); - deserializeColumn(*col_data, type, data_buf, tiny_file.rows); + deserializeColumn(*col_data, type, data_buf, tiny_file.rows, filter); columns[index] = convertColumnByColumnDefineIfNeed(type, std::move(col_data), cd); } @@ -118,7 +126,18 @@ Block ColumnFileTinyReader::readNextBlock() if (read_done) return {}; - auto columns = readFromDisk(data_provider, *col_defs); + auto columns = readFromDisk(data_provider, *col_defs, nullptr); + + read_done = true; + return genBlock(*col_defs, columns); +} + +Block ColumnFileTinyReader::readWithFilter(const IColumn::Filter & filter) +{ + if (read_done) + return {}; + + auto columns = readFromDisk(data_provider, *col_defs, &filter); read_done = true; return genBlock(*col_defs, columns); diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTinyReader.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTinyReader.h index 8bb8c0916c8..796328c39fb 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTinyReader.h +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTinyReader.h @@ -65,10 +65,13 @@ class ColumnFileTinyReader : public ColumnFileReader Columns readFromDisk( const IColumnFileDataProviderPtr & data_provider, - const std::span & column_defines) const; + const std::span & column_defines, + const IColumn::Filter * filter) const; Block readNextBlock() override; + Block readWithFilter(const IColumn::Filter & filter) override; + size_t skipNextBlock() override; ColumnFileReaderPtr createNewReader(const ColumnDefinesPtr & new_col_defs, ReadTag) override; diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTinyVectorIndexReader.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTinyVectorIndexReader.cpp index 079c14ae9ca..fe07ad9da12 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTinyVectorIndexReader.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTinyVectorIndexReader.cpp @@ -27,32 +27,18 @@ namespace DB::DM void ColumnFileTinyVectorIndexReader::read( MutableColumnPtr & vec_column, const std::span & read_rowids, - size_t rowid_start_offset, - size_t read_rows) + size_t rowid_start_offset) { RUNTIME_CHECK(loaded); Stopwatch watch; - vec_column->reserve(read_rows); + vec_column->reserve(read_rowids.size()); std::vector value; - size_t current_rowid = rowid_start_offset; for (const auto & rowid : read_rowids) { // Each ColomnFileTiny has its own vector index, rowid_start_offset is the offset of the ColmnFilePersistSet. vec_index->get(rowid - rowid_start_offset, value); - if (rowid > current_rowid) - { - UInt32 nulls = rowid - current_rowid; - // Insert [] if column is Not Null, or NULL if column is Nullable - vec_column->insertManyDefaults(nulls); - } vec_column->insertData(reinterpret_cast(value.data()), value.size() * sizeof(Float32)); - current_rowid = rowid + 1; - } - if (current_rowid < rowid_start_offset + read_rows) - { - UInt32 nulls = rowid_start_offset + read_rows - current_rowid; - vec_column->insertManyDefaults(nulls); } perf_stat.returned_rows = read_rowids.size(); diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTinyVectorIndexReader.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTinyVectorIndexReader.h index 993b0a072a3..cb6c09c6576 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTinyVectorIndexReader.h +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTinyVectorIndexReader.h @@ -74,15 +74,11 @@ class ColumnFileTinyVectorIndexReader ~ColumnFileTinyVectorIndexReader(); - // Read vector column data and set filter. - // The column will be as same as as the rows of the tiny file, - // but only the rows in selected_rows will be filled, - // others will be filled with default values. + // Read vector column data with the specified rowids. void read( MutableColumnPtr & vec_column, const std::span & read_rowids, - size_t rowid_start_offset, - size_t read_rows); + size_t rowid_start_offset); // Load vector index and search results. // Return the rowids of the selected rows. diff --git a/dbms/src/Storages/DeltaMerge/ConcatSkippableBlockInputStream.cpp b/dbms/src/Storages/DeltaMerge/ConcatSkippableBlockInputStream.cpp index 33206792603..f3590d62401 100644 --- a/dbms/src/Storages/DeltaMerge/ConcatSkippableBlockInputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/ConcatSkippableBlockInputStream.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include @@ -132,13 +133,13 @@ Block ConcatSkippableBlockInputStream::readWithFilter(const IColumn } template -Block ConcatSkippableBlockInputStream::read(FilterPtr & res_filter, bool return_filter) +Block ConcatSkippableBlockInputStream::read() { Block res; while (current_stream != children.end()) { - res = (*current_stream)->read(res_filter, return_filter); + res = (*current_stream)->read(); if (res) { res.setStartOffset(res.startOffset() + precede_stream_rows); @@ -239,12 +240,33 @@ void ConcatVectorIndexBlockInputStream::load() sr_it = end; } - // Not used anymore, release memory. - index_streams.clear(); loaded = true; } +Block ConcatVectorIndexBlockInputStream::read() +{ + load(); + auto block = stream->read(); + + // for streams which are not VectorIndexBlockInputStream, block should filter by bitmap. + if (auto index = std::distance(stream->children.begin(), stream->current_stream); !index_streams[index]) + { + filter.resize(block.rows()); + if (bool all_match = bitmap_filter->get(filter, block.startOffset(), block.rows()); all_match) + return block; + + size_t passed_count = countBytesInFilter(filter); + for (auto & col : block) + { + col.column = col.column->filter(filter, passed_count); + } + } + + return block; +} + SkippableBlockInputStreamPtr ConcatVectorIndexBlockInputStream::build( + const BitmapFilterPtr & bitmap_filter, std::shared_ptr> stream, const ANNQueryInfoPtr & ann_query_info) { @@ -267,6 +289,7 @@ SkippableBlockInputStreamPtr ConcatVectorIndexBlockInputStream::build( return stream; return std::make_shared( + bitmap_filter, stream, std::move(index_streams), ann_query_info->top_k()); diff --git a/dbms/src/Storages/DeltaMerge/ConcatSkippableBlockInputStream.h b/dbms/src/Storages/DeltaMerge/ConcatSkippableBlockInputStream.h index 5a17d44ec85..d3db09d0f7a 100644 --- a/dbms/src/Storages/DeltaMerge/ConcatSkippableBlockInputStream.h +++ b/dbms/src/Storages/DeltaMerge/ConcatSkippableBlockInputStream.h @@ -47,13 +47,7 @@ class ConcatSkippableBlockInputStream : public SkippableBlockInputStream Block readWithFilter(const IColumn::Filter & filter) override; - Block read() override - { - FilterPtr filter = nullptr; - return read(filter, false); - } - - Block read(FilterPtr & res_filter, bool return_filter) override; + Block read() override; private: friend class ConcatVectorIndexBlockInputStream; @@ -72,15 +66,18 @@ class ConcatVectorIndexBlockInputStream : public SkippableBlockInputStream { public: ConcatVectorIndexBlockInputStream( + const BitmapFilterPtr & bitmap_filter_, std::shared_ptr> stream, std::vector && index_streams, UInt32 topk_) : stream(std::move(stream)) , index_streams(std::move(index_streams)) , topk(topk_) + , bitmap_filter(bitmap_filter_) {} static SkippableBlockInputStreamPtr build( + const BitmapFilterPtr & bitmap_filter, std::shared_ptr> stream, const ANNQueryInfoPtr & ann_query_info); @@ -97,17 +94,7 @@ class ConcatVectorIndexBlockInputStream : public SkippableBlockInputStream throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); } - Block read() override - { - FilterPtr filter = nullptr; - return read(filter, false); - } - - Block read(FilterPtr & res_filter, bool return_filter) override - { - load(); - return stream->read(res_filter, return_filter); - } + Block read() override; private: void load(); @@ -117,6 +104,9 @@ class ConcatVectorIndexBlockInputStream : public SkippableBlockInputStream std::vector index_streams; UInt32 topk = 0; bool loaded = false; + + BitmapFilterPtr bitmap_filter; + IColumn::Filter filter; }; } // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp index e2b28f896e8..6a87f0df197 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp @@ -19,6 +19,8 @@ #include #include +#include + namespace DB::DM { @@ -212,8 +214,8 @@ SkippableBlockInputStreamPtr DMFileBlockInputStreamBuilder::tryBuildWithVectorIn max_read_buffer_size, file_provider, read_limiter, - rows_threshold_per_read, - read_one_pack_every_time, + std::numeric_limits::max(), // not limit the rows + false, // read multiple packs at once tracing_id, enable_read_thread, scan_context, diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp index 8d3e69fd645..5ab0b1d4a15 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp @@ -183,8 +183,8 @@ Block DMFileReader::readWithFilter(const IColumn::Filter & filter) size_t start_row_offset = next_row_offset; size_t start_pack_id = next_pack_id; - const auto [read_rows, rs_result] = getReadRows(); - RUNTIME_CHECK(read_rows == filter.size(), read_rows, filter.size()); + const auto [total_read_rows, rs_result] = getReadRows(); + RUNTIME_CHECK(total_read_rows == filter.size(), total_read_rows, filter.size()); size_t last_pack_id = next_pack_id; { size_t offset = 0; @@ -221,6 +221,7 @@ Block DMFileReader::readWithFilter(const IColumn::Filter & filter) // reset next_pack_id to start_pack_id, next_row_offset to start_row_offset next_pack_id = start_pack_id; next_row_offset = start_row_offset; + size_t read_rows = 0; for (size_t pack_id = start_pack_id; pack_id < last_pack_id; ++pack_id) { // When the next pack is not used or the pack is the last pack, call read() to read theses packs and filter them @@ -231,38 +232,29 @@ Block DMFileReader::readWithFilter(const IColumn::Filter & filter) // When i = next_pack_id + 5, call read() to read {next_pack_id + 3, next_pack_id + 4, next_pack_id + 5}th packs if (pack_res[pack_id].isUse() && (pack_id + 1 == pack_res.size() || !pack_res[pack_id + 1].isUse())) { - Block block = read(); - size_t rows = block.rows(); - - if (size_t passed_count = countBytesInFilter(filter, offset, rows); passed_count != rows) + read_rows += pack_stats[pack_id].rows; + IColumn::Filter block_filter(filter.cbegin() + offset, filter.cbegin() + offset + read_rows); + Block block = read(&block_filter); + size_t passed_count = countBytesInFilter(block_filter); + for (size_t i = 0; i < block.columns(); ++i) { - std::vector positions; - positions.reserve(passed_count); - for (size_t p = offset; p < offset + rows; ++p) - { - if (filter[p]) - positions.push_back(p - offset); - } - for (size_t i = 0; i < block.columns(); ++i) - { - columns[i]->insertDisjunctFrom(*block.getByPosition(i).column, positions); - } + auto col = block.getByPosition(i).column; + // Some columns may only deserialize the passed rows. + if (col->size() != passed_count) + col = col->filter(block_filter, passed_count); + columns[i]->insertRangeFrom(*col, 0, passed_count); } - else - { - for (size_t i = 0; i < block.columns(); ++i) - { - columns[i]->insertRangeFrom( - *block.getByPosition(i).column, - 0, - block.getByPosition(i).column->size()); - } - } - offset += rows; + + offset += read_rows; } else if (!pack_res[pack_id].isUse()) { offset += pack_stats[pack_id].rows; + read_rows = 0; + } + else + { + read_rows += pack_stats[pack_id].rows; } } @@ -282,7 +274,7 @@ bool DMFileReader::isCacheableColumn(const ColumnDefine & cd) return cd.id == EXTRA_HANDLE_COLUMN_ID || cd.id == VERSION_COLUMN_ID; } -Block DMFileReader::read() +Block DMFileReader::read(IColumn::Filter * filter) { Stopwatch watch; SCOPE_EXIT(scan_context->total_dmfile_read_time_ns += watch.elapsed();); @@ -378,16 +370,22 @@ Block DMFileReader::read() switch (cd.id) { case EXTRA_HANDLE_COLUMN_ID: - col = readExtraColumn(cd, start_pack_id, read_packs, read_rows, handle_column_clean_read_packs); + col = readExtraColumn(cd, start_pack_id, read_packs, read_rows, handle_column_clean_read_packs, filter); break; case TAG_COLUMN_ID: - col = readExtraColumn(cd, start_pack_id, read_packs, read_rows, del_column_clean_read_packs); + col = readExtraColumn(cd, start_pack_id, read_packs, read_rows, del_column_clean_read_packs, filter); break; case VERSION_COLUMN_ID: - col = readExtraColumn(cd, start_pack_id, read_packs, read_rows, version_column_clean_read_packs); + col = readExtraColumn( + cd, + start_pack_id, + read_packs, + read_rows, + version_column_clean_read_packs, + filter); break; default: - col = readColumn(cd, start_pack_id, read_packs, read_rows); + col = readColumn(cd, start_pack_id, read_packs, read_rows, filter); break; } columns.emplace_back(std::move(col), cd.type, cd.name, cd.id); @@ -453,7 +451,8 @@ ColumnPtr DMFileReader::readExtraColumn( size_t start_pack_id, size_t pack_count, size_t read_rows, - const std::vector & clean_read_packs) + const std::vector & clean_read_packs, + IColumn::Filter * filter) { assert(cd.id == EXTRA_HANDLE_COLUMN_ID || cd.id == TAG_COLUMN_ID || cd.id == VERSION_COLUMN_ID); @@ -488,7 +487,7 @@ ColumnPtr DMFileReader::readExtraColumn( } case ColumnCache::Strategy::Disk: { - src_col = readColumn(cd, range.first, range.second - range.first, rows_count); + src_col = readColumn(cd, range.first, range.second - range.first, rows_count, filter); break; } default: @@ -511,7 +510,12 @@ ColumnPtr DMFileReader::readExtraColumn( return column; } -ColumnPtr DMFileReader::readColumn(const ColumnDefine & cd, size_t start_pack_id, size_t pack_count, size_t read_rows) +ColumnPtr DMFileReader::readColumn( + const ColumnDefine & cd, + size_t start_pack_id, + size_t pack_count, + size_t read_rows, + IColumn::Filter * filter) { // New column after ddl is not exist in this DMFile, fill with default value if (!column_streams.contains(DMFile::getFileNameBase(cd.id))) @@ -522,11 +526,14 @@ ColumnPtr DMFileReader::readColumn(const ColumnDefine & cd, size_t start_pack_id if (column_cache_long_term && cd.id == pk_col_id && ColumnCacheLongTerm::isCacheableColumn(cd)) { // ColumnCacheLongTerm only caches user assigned PrimaryKey column. - auto column_all_data - = column_cache_long_term->get(dmfile->parentPath(), dmfile->fileId(), cd.id, [&]() -> IColumn::Ptr { - // Always read all packs when filling cache - return readFromDiskOrSharingCache(cd, type_on_disk, 0, dmfile->getPacks(), dmfile->getRows()); - }); + auto column_all_data = column_cache_long_term->get( + dmfile->parentPath(), + dmfile->fileId(), + cd.id, + [&]() -> IColumn::Ptr { + // Always read all packs when filling cache, and always set filter to nullptr + return readFromDiskOrSharingCache(cd, type_on_disk, 0, dmfile->getPacks(), dmfile->getRows(), nullptr); + }); auto column = type_on_disk->createColumn(); column->insertRangeFrom(*column_all_data, next_row_offset - read_rows, read_rows); @@ -536,7 +543,7 @@ ColumnPtr DMFileReader::readColumn(const ColumnDefine & cd, size_t start_pack_id // Not cached if (!enable_column_cache || !isCacheableColumn(cd)) { - auto column = readFromDiskOrSharingCache(cd, type_on_disk, start_pack_id, pack_count, read_rows); + auto column = readFromDiskOrSharingCache(cd, type_on_disk, start_pack_id, pack_count, read_rows, filter); return convertColumnByColumnDefineIfNeed(type_on_disk, std::move(column), cd); } @@ -555,7 +562,8 @@ ColumnPtr DMFileReader::readColumn(const ColumnDefine & cd, size_t start_pack_id size_t start_pack_id, size_t pack_count, size_t read_rows) { - return readFromDiskOrSharingCache(cd, type_on_disk, start_pack_id, pack_count, read_rows); + // read from cache, always set filter to nullptr + return readFromDiskOrSharingCache(cd, type_on_disk, start_pack_id, pack_count, read_rows, nullptr); }); // add column to cache addColumnToCache(column_cache, cd.id, start_pack_id, pack_count, column); @@ -567,7 +575,8 @@ ColumnPtr DMFileReader::readFromDisk( const ColumnDefine & cd, const DataTypePtr & type_on_disk, size_t start_pack_id, - size_t read_rows) + size_t read_rows, + IColumn::Filter * filter) { const auto stream_name = DMFile::getFileNameBase(cd.id); auto iter = column_streams.find(stream_name); @@ -593,7 +602,8 @@ ColumnPtr DMFileReader::readFromDisk( read_rows, top_stream->avg_size_hint, true, - {}); + {}, + filter); IDataType::updateAvgValueSizeHint(*mutable_col, top_stream->avg_size_hint); return mutable_col; } @@ -603,7 +613,8 @@ ColumnPtr DMFileReader::readFromDiskOrSharingCache( const DataTypePtr & type_on_disk, size_t start_pack_id, size_t pack_count, - size_t read_rows) + size_t read_rows, + IColumn::Filter * filter) { bool has_concurrent_reader = DMFileReaderPool::instance().hasConcurrentReader(*this); bool reach_sharing_column_memory_limit = shared_column_data_mem_tracker != nullptr @@ -612,11 +623,9 @@ ColumnPtr DMFileReader::readFromDiskOrSharingCache( { GET_METRIC(tiflash_storage_read_thread_counter, type_add_cache_total_bytes_limit).Increment(); } - bool enable_sharing_column = has_concurrent_reader && !reach_sharing_column_memory_limit; - ColumnPtr column; - if (enable_sharing_column) + if (bool enable_sharing_column = has_concurrent_reader && !reach_sharing_column_memory_limit; enable_sharing_column) { - column = getColumnFromCache( + auto column = getColumnFromCache( data_sharing_col_data_cache, cd, type_on_disk, @@ -633,22 +642,18 @@ ColumnPtr DMFileReader::readFromDiskOrSharingCache( // This can lead to inaccurate memory statistics of MemoryTracker. // To solve this problem, we use a independent global memory tracker to trace the shared column data in the data_sharing_col_data_cache. MemoryTrackerSetter mem_tracker_guard(true, nullptr); - return readFromDisk(cd, type_on_disk, start_pack_id, read_rows); + // If there are concurrent read requests, we should read the column from disk and share it, always set filter to nullptr. + return readFromDisk(cd, type_on_disk, start_pack_id, read_rows, nullptr); }); - } - else - { - column = readFromDisk(cd, type_on_disk, start_pack_id, read_rows); - } - - // Set the column to DMFileReaderPool to share the column data. - if (enable_sharing_column && data_sharing_col_data_cache != nullptr) - { + RUNTIME_CHECK(data_sharing_col_data_cache != nullptr); + // Set the column to DMFileReaderPool to share the column data. DMFileReaderPool::instance().set(*this, cd.id, start_pack_id, pack_count, column); // Delete column from local cache since it is not used anymore. data_sharing_col_data_cache->delColumn(cd.id, next_pack_id); + return column; } - return column; + + return readFromDisk(cd, type_on_disk, start_pack_id, read_rows, filter); } void DMFileReader::addColumnToCache( diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileReader.h b/dbms/src/Storages/DeltaMerge/File/DMFileReader.h index 2cba9b63c87..88bd6bf1b74 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileReader.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileReader.h @@ -87,7 +87,7 @@ class DMFileReader /// Read specified rows. Block readWithFilter(const IColumn::Filter & filter); - Block read(); + Block read(IColumn::Filter * filter = nullptr); std::string path() const { // Status of DMFile can be updated when DMFileReader in used and the pathname will be changed. @@ -106,19 +106,27 @@ class DMFileReader size_t start_pack_id, size_t pack_count, size_t read_rows, - const std::vector & clean_read_packs); + const std::vector & clean_read_packs, + IColumn::Filter * filter); ColumnPtr readFromDisk( const ColumnDefine & cd, const DataTypePtr & type_on_disk, size_t start_pack_id, - size_t read_rows); + size_t read_rows, + IColumn::Filter * filter); ColumnPtr readFromDiskOrSharingCache( const ColumnDefine & cd, const DataTypePtr & type_on_disk, size_t start_pack_id, size_t pack_count, - size_t read_rows); - ColumnPtr readColumn(const ColumnDefine & cd, size_t start_pack_id, size_t pack_count, size_t read_rows); + size_t read_rows, + IColumn::Filter * filter); + ColumnPtr readColumn( + const ColumnDefine & cd, + size_t start_pack_id, + size_t pack_count, + size_t read_rows, + IColumn::Filter * filter); ColumnPtr cleanRead( const ColumnDefine & cd, size_t rows_count, diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileVectorIndexReader.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileVectorIndexReader.cpp index 9cbfedba955..f25090b390a 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileVectorIndexReader.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileVectorIndexReader.cpp @@ -198,33 +198,19 @@ std::vector DMFileVectorIndexReader::loadVector void DMFileVectorIndexReader::read( MutableColumnPtr & vec_column, - const std::span & selected_rows, - size_t start_offset, - size_t column_size) + const std::span & selected_rows) { Stopwatch watch; RUNTIME_CHECK(loaded); - vec_column->reserve(column_size); + vec_column->reserve(selected_rows.size()); std::vector value; - size_t current_rowid = start_offset; for (auto rowid : selected_rows) { vec_index->get(rowid, value); - if (rowid > current_rowid) - { - UInt32 nulls = rowid - current_rowid; - // Insert [] if column is Not Null, or NULL if column is Nullable - vec_column->insertManyDefaults(nulls); - } vec_column->insertData(reinterpret_cast(value.data()), value.size() * sizeof(Float32)); - current_rowid = rowid + 1; - } - if (current_rowid < start_offset + column_size) - { - UInt32 nulls = column_size + start_offset - current_rowid; - vec_column->insertManyDefaults(nulls); } + perf_stat.duration_read_vec_column += watch.elapsedSeconds(); } diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileVectorIndexReader.h b/dbms/src/Storages/DeltaMerge/File/DMFileVectorIndexReader.h index b19850a1180..3c8a3630e0b 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileVectorIndexReader.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileVectorIndexReader.h @@ -70,16 +70,8 @@ class DMFileVectorIndexReader ~DMFileVectorIndexReader(); - // Read vector column data and set filter. - // The column will be as same as as the rows of the tiny file, - // but only the rows in sorted_results will be filled, - // others will be filled with default values. - // return the real number of rows read. - void read( - MutableColumnPtr & vec_column, - const std::span & selected_rows, - size_t start_offset, - size_t column_size); + // Read vector column data with the specified rowids. + void read(MutableColumnPtr & vec_column, const std::span & selected_rows); // Load vector index and search results. // Return the rowids of the selected rows. diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileWithVectorIndexBlockInputStream.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileWithVectorIndexBlockInputStream.cpp index 4f008c7454f..d2ef809ac4b 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileWithVectorIndexBlockInputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileWithVectorIndexBlockInputStream.cpp @@ -65,75 +65,10 @@ DMFileWithVectorIndexBlockInputStream::~DMFileWithVectorIndexBlockInputStream() valid_packs_after_search); } -Block DMFileWithVectorIndexBlockInputStream::read(FilterPtr & res_filter, bool return_filter) -{ - if (return_filter) - return readImpl(res_filter); - - // If return_filter == false, we must filter by ourselves. - - FilterPtr filter = nullptr; - auto res = readImpl(filter); - if (filter != nullptr) - { - for (auto & col : res) - col.column = col.column->filter(*filter, -1); - } - // filter == nullptr means all rows are valid and no need to filter. - - return res; -} - -Block DMFileWithVectorIndexBlockInputStream::readImpl(FilterPtr & res_filter) +Block DMFileWithVectorIndexBlockInputStream::read() { internalLoad(); - auto [res, real_rows] = reader.read_columns.empty() ? readByIndexReader() : readByFollowingOtherColumns(); - - if (!res) - return {}; - - // If all rows are valid, res_filter is nullptr. - if (real_rows == res.rows()) - { - res_filter = nullptr; - return res; - } - - // Assign output filter according to sorted_results. - // - // For example, if sorted_results is [3, 10], the complete filter array is: - // [0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1] - // And we should only return filter array starting from res.startOffset(), like: - // [0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1] - // ^startOffset ^startOffset+rows - // filter: [0, 0, 0, 0, 0] - // - // We will use startOffset as lowerBound (inclusive), ans startOffset+rows - // as upperBound (exclusive) to find whether this range has a match in sorted_results. - - const auto start_offset = res.startOffset(); - const auto max_rowid_exclusive = start_offset + res.rows(); - - filter.clear(); - filter.resize_fill(res.rows(), 0); - - auto it = std::lower_bound(sorted_results.begin(), sorted_results.end(), start_offset); - while (it != sorted_results.end()) - { - auto rowid = *it; - if (rowid >= max_rowid_exclusive) - break; - filter[rowid - start_offset] = 1; - ++it; - } - - res_filter = &filter; - return res; -} - -std::tuple DMFileWithVectorIndexBlockInputStream::readByIndexReader() -{ const auto & pack_stats = dmfile->getPackStats(); size_t all_packs = pack_stats.size(); const auto & pack_res = reader.pack_filter.getPackResConst(); @@ -149,8 +84,8 @@ std::tuple DMFileWithVectorIndexBlockInputStream::readByIndexRead ++index_reader_next_pack_id; } + // Finished if (index_reader_next_pack_id >= all_packs) - // Finished return {}; auto block_start_row_id = index_reader_next_row_id; @@ -162,50 +97,42 @@ std::tuple DMFileWithVectorIndexBlockInputStream::readByIndexRead ++index_reader_next_pack_id; } - Block block; - block.setStartOffset(block_start_row_id); - - size_t read_rows = index_reader_next_row_id - block_start_row_id; auto vec_column = vec_cd.type->createColumn(); - auto begin = std::lower_bound(sorted_results.cbegin(), sorted_results.cend(), block_start_row_id); auto end = std::lower_bound(begin, sorted_results.cend(), index_reader_next_row_id); const std::span block_selected_rows{begin, end}; - vec_index_reader->read(vec_column, block_selected_rows, block_start_row_id, read_rows); + if (block_selected_rows.empty()) + return {}; - block.insert(ColumnWithTypeAndName{std::move(vec_column), vec_cd.type, vec_cd.name, vec_cd.id}); - return {block, block_selected_rows.size()}; -} + // read vector column + vec_index_reader->read(vec_column, block_selected_rows); -std::tuple DMFileWithVectorIndexBlockInputStream::readByFollowingOtherColumns() -{ - // First read other columns. - Stopwatch w; - auto block_others = reader.read(); - duration_read_from_other_columns_seconds += w.elapsedSeconds(); + Block block; + block.setStartOffset(block_start_row_id); - if (!block_others) - return {}; + // read other columns if needed + if (!reader.read_columns.empty()) + { + // FIXME(filtering): + // After vector search support filtering, RSResult will limit the rows to read (DMFileReader::getReadPackLimit). + // But in current implementation, we will try read all continous used packs at once. + // This may cause filter->size() bigger than expected. We need to fix this if we support filtering. - auto read_rows = block_others.rows(); + // FIXME: read too many packs at once causes performance regression. + Stopwatch w; - // Using vec_cd.type to construct a Column directly instead of using - // the type from dmfile, so that we don't need extra transforms - // (e.g. wrap with a Nullable). vec_index_reader is compatible with - // both Nullable and NotNullable. - auto vec_column = vec_cd.type->createColumn(); + filter.clear(); + filter.resize_fill(index_reader_next_row_id - block_start_row_id, 0); + for (const auto rowid : block_selected_rows) + filter[rowid - block_start_row_id] = 1; - // Then read from vector index for the same pack. - auto begin = std::lower_bound(sorted_results.cbegin(), sorted_results.cend(), block_others.startOffset()); - auto end = std::lower_bound(begin, sorted_results.cend(), block_others.startOffset() + read_rows); - const std::span block_selected_rows{begin, end}; - vec_index_reader->read(vec_column, block_selected_rows, block_others.startOffset(), read_rows); + block = reader.readWithFilter(filter); + duration_read_from_other_columns_seconds += w.elapsedSeconds(); + } - // Re-assemble block using the same layout as header. - // Insert the vector column into the block. auto index = header.getPositionByName(vec_cd.name); - block_others.insert(index, ColumnWithTypeAndName(std::move(vec_column), vec_cd.type, vec_cd.name)); - return {block_others, block_selected_rows.size()}; + block.insert(index, ColumnWithTypeAndName{std::move(vec_column), vec_cd.type, vec_cd.name, vec_cd.id}); + return block; } std::vector DMFileWithVectorIndexBlockInputStream::load() diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileWithVectorIndexBlockInputStream.h b/dbms/src/Storages/DeltaMerge/File/DMFileWithVectorIndexBlockInputStream.h index d77ff9408af..609a357af2d 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileWithVectorIndexBlockInputStream.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileWithVectorIndexBlockInputStream.h @@ -90,21 +90,7 @@ class DMFileWithVectorIndexBlockInputStream : public VectorIndexBlockInputStream ~DMFileWithVectorIndexBlockInputStream() override; public: - Block read() override - { - FilterPtr filter = nullptr; - return read(filter, false); - } - - // When all rows in block are not filtered out, - // `res_filter` will be set to null. - // The caller needs to do handle this situation. - Block read(FilterPtr & res_filter, bool return_filter) override; - - // When all rows in block are not filtered out, - // `res_filter` will be set to null. - // The caller needs to do handle this situation. - Block readImpl(FilterPtr & res_filter); + Block read() override; String getName() const override { return "DMFileWithVectorIndex"; } @@ -115,19 +101,9 @@ class DMFileWithVectorIndexBlockInputStream : public VectorIndexBlockInputStream void setSelectedRows(const std::span & selected_rows) override; private: - // Only used in readByIndexReader() size_t index_reader_next_pack_id = 0; - // Only used in readByIndexReader() size_t index_reader_next_row_id = 0; - // Read data totally from the VectorColumnFromIndexReader. This is used - // when there is no other column to read. - std::tuple readByIndexReader(); - - // Read data from other columns first, then read from VectorColumnFromIndexReader. This is used - // when there are other columns to read. - std::tuple readByFollowingOtherColumns(); - // Load vector index and update sorted_results. void internalLoad(); diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index 201c2075113..6ee1df53fa5 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -3345,7 +3345,7 @@ SkippableBlockInputStreamPtr Segment::getConcatSkippableBlockInputStream( assert(stream != nullptr); stream->appendChild(persisted_files_stream, persisted_files->getRows()); stream->appendChild(mem_table_stream, memtable->getRows()); - return ConcatVectorIndexBlockInputStream::build(stream, ann_query_info); + return ConcatVectorIndexBlockInputStream::build(bitmap_filter, stream, ann_query_info); } BlockInputStreamPtr Segment::getLateMaterializationStream( @@ -3509,7 +3509,7 @@ BlockInputStreamPtr Segment::getBitmapFilterInputStream( read_data_block_rows); } - auto skippable_stream = getConcatSkippableBlockInputStream( + auto stream = getConcatSkippableBlockInputStream( bitmap_filter, segment_snap, dm_context, @@ -3519,8 +3519,7 @@ BlockInputStreamPtr Segment::getBitmapFilterInputStream( start_ts, read_data_block_rows, ReadTag::Query); - auto stream = std::make_shared(columns_to_read, skippable_stream, bitmap_filter); - if (auto * vector_index_stream = dynamic_cast(skippable_stream.get()); + if (auto * vector_index_stream = dynamic_cast(stream.get()); vector_index_stream) { // For vector search, there are more likely to return small blocks from different @@ -3532,7 +3531,7 @@ BlockInputStreamPtr Segment::getBitmapFilterInputStream( /*min_block_size_bytes=*/0, dm_context.tracing_id); } - return stream; + return std::make_shared(columns_to_read, stream, bitmap_filter); } // clipBlockRows try to limit the block size not exceed settings.max_block_bytes. diff --git a/dbms/src/Storages/StorageLog.cpp b/dbms/src/Storages/StorageLog.cpp index ff14538d9b2..13d39717b95 100644 --- a/dbms/src/Storages/StorageLog.cpp +++ b/dbms/src/Storages/StorageLog.cpp @@ -78,7 +78,7 @@ class LogBlockInputStream final : public IProfilingBlockInputStream res.insert({name_type.type->createColumn(), name_type.type, name_type.name}); return Nested::flatten(res); - }; + } protected: Block readImpl() override; @@ -262,13 +262,8 @@ void LogBlockInputStream::readData( return &it->second.compressed; }; - type.deserializeBinaryBulkWithMultipleStreams( - column, - stream_getter, - max_rows_to_read, - 0, - true, - {}); /// TODO Use avg_value_size_hint. + /// TODO Use avg_value_size_hint. + type.deserializeBinaryBulkWithMultipleStreams(column, stream_getter, max_rows_to_read, 0, true, {}); } @@ -405,7 +400,7 @@ StorageLog::StorageLog( , file_checker(path + escapeForFileName(name) + '/' + "sizes.json") { if (path.empty()) - throw Exception("Storage " + getName() + " requires data path", ErrorCodes::INCORRECT_FILE_NAME); + throw Exception(ErrorCodes::INCORRECT_FILE_NAME, "Storage {} requires data path", NAME); /// create files if they do not exist Poco::File(path + escapeForFileName(name) + '/').createDirectories(); diff --git a/dbms/src/Storages/StorageLog.h b/dbms/src/Storages/StorageLog.h index ce6c2c1b807..20ffe3a6011 100644 --- a/dbms/src/Storages/StorageLog.h +++ b/dbms/src/Storages/StorageLog.h @@ -37,7 +37,8 @@ class StorageLog friend class LogBlockOutputStream; public: - std::string getName() const override { return "Log"; } + static constexpr auto NAME = "Log"; + std::string getName() const override { return NAME; } std::string getTableName() const override { return name; } BlockInputStreams read( @@ -55,9 +56,9 @@ class StorageLog bool checkData() const override; - std::string full_path() const { return path + escapeForFileName(name) + '/'; } + std::string fullPath() const { return path + escapeForFileName(name) + '/'; } - String getDataPath() const override { return full_path(); } + String getDataPath() const override { return fullPath(); } protected: /** Attach the table with the appropriate name, along the appropriate path (with / at the end),