Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vector: optimize read performance #9687

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions dbms/src/DataTypes/DataTypeAggregateFunction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ void DataTypeAggregateFunction::deserializeBinaryBulk(
IColumn & column,
ReadBuffer & istr,
size_t limit,
double /*avg_value_size_hint*/) const
double /*avg_value_size_hint*/,
const IColumn::Filter * filter) const
{
ColumnAggregateFunction & real_column = typeid_cast<ColumnAggregateFunction &>(column);
ColumnAggregateFunction::Container & vec = real_column.getData();
Expand All @@ -145,6 +146,9 @@ void DataTypeAggregateFunction::deserializeBinaryBulk(
if (istr.eof())
break;

// Actually, there is no need to deserialize each row when filter is provided.
// But since we do not know the size of serialized state, we have to deserialize it.
// TODO: Optimize this.
AggregateDataPtr place = arena.alloc(size_of_state);

function->create(place);
Expand All @@ -159,7 +163,10 @@ void DataTypeAggregateFunction::deserializeBinaryBulk(
throw;
}

vec.push_back(place);
if (filter && !(*filter)[i])
function->destroy(place);
else
vec.push_back(place);
}
}

Expand Down
8 changes: 6 additions & 2 deletions dbms/src/DataTypes/DataTypeAggregateFunction.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,12 @@ class DataTypeAggregateFunction final : public IDataType
void serializeBinary(const IColumn & column, size_t row_num, WriteBuffer & ostr) const override;
void deserializeBinary(IColumn & column, ReadBuffer & istr) const override;
void serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit) const override;
void deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint)
const override;
void deserializeBinaryBulk(
IColumn & column,
ReadBuffer & istr,
size_t limit,
double avg_value_size_hint,
const IColumn::Filter * filter) const override;
void serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr) const override;
void serializeTextEscaped(const IColumn & column, size_t row_num, WriteBuffer & ostr) const override;
void deserializeTextEscaped(IColumn & column, ReadBuffer & istr) const override;
Expand Down
8 changes: 5 additions & 3 deletions dbms/src/DataTypes/DataTypeArray.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,8 @@ void DataTypeArray::deserializeBinaryBulkWithMultipleStreams(
size_t limit,
double /*avg_value_size_hint*/,
bool position_independent_encoding,
SubstreamPath & path) const
SubstreamPath & path,
const IColumn::Filter * filter) const
{
ColumnArray & column_array = typeid_cast<ColumnArray &>(column);

Expand All @@ -245,7 +246,7 @@ void DataTypeArray::deserializeBinaryBulkWithMultipleStreams(
"try to deserialize Array type to non-empty column without position independent encoding, type_name={}",
getName());
DataTypeNumber<ColumnArray::Offset>()
.deserializeBinaryBulk(column_array.getOffsetsColumn(), *stream, limit, 0);
.deserializeBinaryBulk(column_array.getOffsetsColumn(), *stream, limit, 0, filter);
}
}

Expand All @@ -269,7 +270,8 @@ void DataTypeArray::deserializeBinaryBulkWithMultipleStreams(
nested_limit,
0,
position_independent_encoding,
path);
path,
filter);

/// Check consistency between offsets and elements subcolumns.
/// But if elements column is empty - it's ok for columns of Nested types that was added by ALTER.
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/DataTypes/DataTypeArray.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ class DataTypeArray final : public IDataType
size_t limit,
double avg_value_size_hint,
bool position_independent_encoding,
SubstreamPath & path) const override;
SubstreamPath & path,
const IColumn::Filter * filter) const override;

MutableColumnPtr createColumn() const override;

Expand Down
50 changes: 44 additions & 6 deletions dbms/src/DataTypes/DataTypeDecimal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include <Columns/ColumnDecimal.h>
#include <Columns/ColumnsCommon.h>
#include <Columns/IColumn.h>
#include <Common/typeid_cast.h>
#include <Core/Field.h>
Expand Down Expand Up @@ -79,13 +80,50 @@ void DataTypeDecimal<T>::deserializeBinaryBulk(
IColumn & column,
ReadBuffer & istr,
size_t limit,
double /*avg_value_size_hint*/) const
double /*avg_value_size_hint*/,
const IColumn::Filter * filter) const
{
typename ColumnType::Container & x = typeid_cast<ColumnType &>(column).getData();
size_t initial_size = x.size();
x.resize(initial_size + limit);
size_t size = istr.readBig(reinterpret_cast<char *>(&x[initial_size]), sizeof(FieldType) * limit);
x.resize(initial_size + size / sizeof(FieldType));
auto & x = typeid_cast<ColumnType &>(column).getData();
size_t current_size = x.size();
if (!filter)
{
x.resize(current_size + limit);
size_t size = istr.readBig(reinterpret_cast<char *>(&x[current_size]), sizeof(FieldType) * limit);
x.resize(current_size + size / sizeof(FieldType));
return;
}

const size_t passed = countBytesInFilter(filter->data(), limit);
x.resize(current_size + passed);
UInt8 prev = (*filter)[0];
size_t count = 1;
for (size_t i = 1; i < limit; ++i)
{
bool break_point = ((*filter)[i] != prev);
if (break_point && prev)
{
size_t size = istr.read(reinterpret_cast<char *>(&x[current_size]), sizeof(FieldType) * count);
current_size += size / sizeof(FieldType);
count = 1;
}
else if (break_point && !prev)
{
istr.ignore(sizeof(FieldType) * count);
count = 1;
}
else
{
++count;
}
prev = (*filter)[i];
}
if (prev)
{
size_t size = istr.read(reinterpret_cast<char *>(&x[current_size]), sizeof(FieldType) * count);
current_size += size / sizeof(FieldType);
}

x.resize(current_size);
}

template <typename T>
Expand Down
8 changes: 6 additions & 2 deletions dbms/src/DataTypes/DataTypeDecimal.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,12 @@ class DataTypeDecimal final : public IDataType
void deserializeBinary(IColumn & column, ReadBuffer & istr) const override;

void serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit) const override;
void deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint)
const override;
void deserializeBinaryBulk(
IColumn & column,
ReadBuffer & istr,
size_t limit,
double avg_value_size_hint,
const IColumn::Filter * filter) const override;

void serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr) const override;

Expand Down
48 changes: 43 additions & 5 deletions dbms/src/DataTypes/DataTypeEnum.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Columns/ColumnsCommon.h>
#include <Common/UTF8Helpers.h>
#include <Common/typeid_cast.h>
#include <DataTypes/DataTypeEnum.h>
Expand Down Expand Up @@ -225,13 +226,50 @@ void DataTypeEnum<Type>::deserializeBinaryBulk(
IColumn & column,
ReadBuffer & istr,
size_t limit,
double /*avg_value_size_hint*/) const
double /*avg_value_size_hint*/,
const IColumn::Filter * filter) const
{
auto & x = typeid_cast<ColumnType &>(column).getData();
const auto initial_size = x.size();
x.resize(initial_size + limit);
const auto size = istr.readBig(reinterpret_cast<char *>(&x[initial_size]), sizeof(FieldType) * limit);
x.resize(initial_size + size / sizeof(FieldType));
size_t current_size = x.size();
if (!filter)
{
x.resize(current_size + limit);
size_t size = istr.readBig(reinterpret_cast<char *>(&x[current_size]), sizeof(FieldType) * limit);
x.resize(current_size + size / sizeof(FieldType));
return;
}

const size_t passed = countBytesInFilter(filter->data(), limit);
x.resize(current_size + passed);
UInt8 prev = (*filter)[0];
size_t count = 1;
for (size_t i = 1; i < limit; ++i)
{
bool break_point = ((*filter)[i] != prev);
if (break_point && prev)
{
size_t size = istr.read(reinterpret_cast<char *>(&x[current_size]), sizeof(FieldType) * count);
current_size += size / sizeof(FieldType);
count = 1;
}
else if (break_point && !prev)
{
istr.ignore(sizeof(FieldType) * count);
count = 1;
}
else
{
++count;
}
prev = (*filter)[i];
}
if (prev)
{
size_t size = istr.read(reinterpret_cast<char *>(&x[current_size]), sizeof(FieldType) * count);
current_size += size / sizeof(FieldType);
}

x.resize(current_size);
}

template <typename Type>
Expand Down
8 changes: 6 additions & 2 deletions dbms/src/DataTypes/DataTypeEnum.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,12 @@ class DataTypeEnum final : public IDataTypeEnum
void deserializeTextJSON(IColumn & column, ReadBuffer & istr) const override;

void serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit) const override;
void deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint)
const override;
void deserializeBinaryBulk(
IColumn & column,
ReadBuffer & istr,
size_t limit,
double avg_value_size_hint,
const IColumn::Filter * filter) const override;

MutableColumnPtr createColumn() const override { return ColumnType::create(); }

Expand Down
54 changes: 45 additions & 9 deletions dbms/src/DataTypes/DataTypeFixedString.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <Columns/ColumnConst.h>
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnsCommon.h>
#include <Columns/ColumnsNumber.h>
#include <Common/typeid_cast.h>
#include <DataTypes/DataTypeFactory.h>
Expand Down Expand Up @@ -105,19 +106,54 @@ void DataTypeFixedString::deserializeBinaryBulk(
IColumn & column,
ReadBuffer & istr,
size_t limit,
double /*avg_value_size_hint*/) const
double /*avg_value_size_hint*/,
const IColumn::Filter * filter) const
{
ColumnFixedString::Chars_t & data = typeid_cast<ColumnFixedString &>(column).getChars();
auto & data = typeid_cast<ColumnFixedString &>(column).getChars();
size_t current_size = data.size();
if (!filter)
{
data.resize(current_size + limit * n);
size_t size = istr.readBig(reinterpret_cast<char *>(&data[current_size]), n * limit);

size_t initial_size = data.size();
size_t max_bytes = limit * n;
data.resize(initial_size + max_bytes);
size_t read_bytes = istr.readBig(reinterpret_cast<char *>(&data[initial_size]), max_bytes);
if (size % n != 0)
throw Exception("Cannot read all data of type FixedString", ErrorCodes::CANNOT_READ_ALL_DATA);

data.resize(current_size + size);
return;
}

if (read_bytes % n != 0)
throw Exception("Cannot read all data of type FixedString", ErrorCodes::CANNOT_READ_ALL_DATA);
const size_t passed = countBytesInFilter(filter->data(), limit);
data.resize(current_size + passed * n);
UInt8 prev = (*filter)[0];
size_t count = 1;
for (size_t i = 1; i < limit; ++i)
{
bool break_point = ((*filter)[i] != prev);
if (break_point && prev)
{
size_t size = istr.read(reinterpret_cast<char *>(&data[current_size]), n * count);
current_size += size;
count = 1;
}
else if (break_point && !prev)
{
istr.ignore(n * count);
count = 1;
}
else
{
++count;
}
prev = (*filter)[i];
}
if (prev)
{
size_t size = istr.read(reinterpret_cast<char *>(&data[current_size]), n * count);
current_size += size;
}

data.resize(initial_size + read_bytes);
data.resize(current_size);
}


Expand Down
8 changes: 6 additions & 2 deletions dbms/src/DataTypes/DataTypeFixedString.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,12 @@ class DataTypeFixedString final : public IDataType
void deserializeBinary(IColumn & column, ReadBuffer & istr) const override;

void serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit) const override;
void deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint)
const override;
void deserializeBinaryBulk(
IColumn & column,
ReadBuffer & istr,
size_t limit,
double avg_value_size_hint,
const IColumn::Filter * filter) const override;

void serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr) const override;

Expand Down
3 changes: 2 additions & 1 deletion dbms/src/DataTypes/DataTypeNothing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ void DataTypeNothing::deserializeBinaryBulk(
IColumn & column,
ReadBuffer & istr,
size_t limit,
double /*avg_value_size_hint*/) const
double,
const IColumn::Filter *) const
{
typeid_cast<ColumnNothing &>(column).addSize(istr.tryIgnore(limit));
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/DataTypes/DataTypeNothing.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class DataTypeNothing final : public IDataTypeDummy

/// These methods read and write zero bytes just to allow to figure out size of column.
void serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit) const override;
void deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint)
void deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, size_t limit, double, const IColumn::Filter *)
const override;

bool equals(const IDataType & rhs) const override;
Expand Down
8 changes: 5 additions & 3 deletions dbms/src/DataTypes/DataTypeNullable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,14 @@ void DataTypeNullable::deserializeBinaryBulkWithMultipleStreams(
size_t limit,
double avg_value_size_hint,
bool position_independent_encoding,
SubstreamPath & path) const
SubstreamPath & path,
const IColumn::Filter * filter) const
{
auto & col = static_cast<ColumnNullable &>(column);

path.emplace_back(Substream::NullMap);
if (auto * stream = getter(path))
DataTypeUInt8().deserializeBinaryBulk(col.getNullMapColumn(), *stream, limit, 0);
DataTypeUInt8().deserializeBinaryBulk(col.getNullMapColumn(), *stream, limit, 0, filter);

path.back() = Substream::NullableElements;
nested_data_type->deserializeBinaryBulkWithMultipleStreams(
Expand All @@ -110,7 +111,8 @@ void DataTypeNullable::deserializeBinaryBulkWithMultipleStreams(
limit,
avg_value_size_hint,
position_independent_encoding,
path);
path,
filter);
}


Expand Down
3 changes: 2 additions & 1 deletion dbms/src/DataTypes/DataTypeNullable.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ class DataTypeNullable final : public IDataType
size_t limit,
double avg_value_size_hint,
bool position_independent_encoding,
SubstreamPath & path) const override;
SubstreamPath & path,
const IColumn::Filter * filter) const override;

void serializeBinary(const Field & field, WriteBuffer & ostr) const override
{
Expand Down
Loading