Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-45594: [C++][Parquet] POC: Optimize Parquet DecodeArrow in DeltaLengthByteArray #45622

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions cpp/src/arrow/array/builder_binary.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "arrow/util/binary_view_util.h"
#include "arrow/util/macros.h"
#include "arrow/util/visibility.h"
#include "arrow/visit_data_inline.h"

namespace arrow {

Expand Down Expand Up @@ -303,6 +304,77 @@ class BaseBinaryBuilder
return Status::OK();
}

Status AppendBinaryWithLengths(std::string_view binary, const int32_t* value_lengths,
int64_t length) {
Comment on lines +307 to +308
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not call this AppendValuesWithLengths?
Also, it should probably take the right offset type?
And you need to add a docstring...

Suggested change
Status AppendBinaryWithLengths(std::string_view binary, const int32_t* value_lengths,
int64_t length) {
/// XXX docstring
Status AppendValuesWithLengths(std::string_view binary, util::span<const offset_type> lengths) {

ARROW_RETURN_NOT_OK(Reserve(length));
UnsafeAppendToBitmap(/*valid_bytes=*/NULLPTR, length);
Comment on lines +309 to +310
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you call these after the error checks below?

// All values is valid
int64_t accum_length = 0;
for (int64_t i = 0; i < length; ++i) {
accum_length += value_lengths[i];
}
if (ARROW_PREDICT_FALSE(binary.size() < static_cast<size_t>(accum_length))) {
return Status::Invalid("Binary data is too short");
}
Comment on lines +316 to +318
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not check equality? It's trivial to call substr on a std::string_view.

Suggested change
if (ARROW_PREDICT_FALSE(binary.size() < static_cast<size_t>(accum_length))) {
return Status::Invalid("Binary data is too short");
}
if (ARROW_PREDICT_FALSE(binary.size() != static_cast<size_t>(accum_length))) {
return Status::Invalid("Binary size does not match lengths array");
}

if (ARROW_PREDICT_FALSE(binary.size() + value_data_builder_.length() >
std::numeric_limits<int32_t>::max())) {
return Status::Invalid("Append binary data too long");
}
Comment on lines +319 to +322
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's call ValidateOverflow instead?

std::string_view sub_data = binary.substr(0, accum_length);
ARROW_RETURN_NOT_OK(value_data_builder_.Append(
reinterpret_cast<const uint8_t*>(sub_data.data()), sub_data.size()));
accum_length = 0;
const int64_t initialize_offset = value_data_builder_.length();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const int64_t initialize_offset = value_data_builder_.length();
const int64_t initial_offset = value_data_builder_.length();

for (int64_t i = 0; i < length; ++i) {
offsets_builder_.UnsafeAppend(
static_cast<int32_t>(initialize_offset + accum_length));
accum_length += value_lengths[i];
}
return Status::OK();
}

Status AppendBinaryWithLengths(std::string_view binary, const int32_t* value_lengths,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comments below...

int64_t length, int64_t null_count,
const uint8_t* valid_bits, int64_t valid_bits_offset) {
if (valid_bits == NULLPTR || null_count == 0) {
return AppendBinaryWithLengths(binary, value_lengths, length);
}
ARROW_RETURN_NOT_OK(Reserve(length));
int64_t accum_length = 0;
for (int64_t i = 0; i < length; ++i) {
accum_length += value_lengths[i];
}
if (ARROW_PREDICT_FALSE(binary.size() < static_cast<size_t>(accum_length))) {
return Status::Invalid("Binary data is too short");
}
const int64_t original_offset = value_data_builder_.length();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you use the same naming as above? e.g. initial_offset

if (ARROW_PREDICT_FALSE(original_offset + accum_length) >
std::numeric_limits<int32_t>::max()) {
return Status::Invalid("Append binary data too long");
}

std::string_view sub_data = binary.substr(0, accum_length);
ARROW_RETURN_NOT_OK(value_data_builder_.Append(
reinterpret_cast<const uint8_t*>(sub_data.data()), sub_data.size()));
int64_t length_idx = 0;
accum_length = 0;
RETURN_NOT_OK(VisitNullBitmapInline(
valid_bits, valid_bits_offset, length, null_count,
[&]() {
offsets_builder_.UnsafeAppend(
static_cast<int32_t>(original_offset + accum_length));
accum_length += value_lengths[length_idx];
++length_idx;
return Status::OK();
},
[&]() {
offsets_builder_.UnsafeAppend(
static_cast<int32_t>(original_offset + accum_length));
return Status::OK();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, we're not incrementing length_idx here? This is not usual in Builder APIs, I don't think this is a good idea.

}));
return Status::OK();
}

void Reset() override {
ArrayBuilder::Reset();
offsets_builder_.Reset();
Expand Down
38 changes: 36 additions & 2 deletions cpp/src/parquet/decoder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,8 @@ struct ArrowBinaryHelper<ByteArrayType> {
return acc_->builder->AppendNull();
}

bool CanFit(int64_t length) const { return length <= chunk_space_remaining_; }

private:
Status PushChunk() {
ARROW_ASSIGN_OR_RAISE(auto chunk, acc_->builder->Finish());
Expand All @@ -504,8 +506,6 @@ struct ArrowBinaryHelper<ByteArrayType> {
return Status::OK();
}

bool CanFit(int64_t length) const { return length <= chunk_space_remaining_; }

Accumulator* acc_;
int64_t entries_remaining_;
int64_t chunk_space_remaining_;
Expand Down Expand Up @@ -1675,13 +1675,47 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl,
num_valid_values_ = num_length;
}

Status DecodeArrowDenseFastPath(
int num_values, int null_count, const uint8_t* valid_bits,
int64_t valid_bits_offset, typename EncodingTraits<ByteArrayType>::Accumulator* out,
int* out_num_values) {
int max_values = num_values - null_count;
if (num_values - null_count > num_valid_values_) {
throw ParquetException("Expected to decode ", num_values - null_count,
" values, but can decode decoded ", num_valid_values_,
" values.");
}
const int32_t* length_ptr = buffered_length_->data_as<int32_t>() + length_idx_;
int bytes_offset = len_ - decoder_->bytes_left();
const uint8_t* data_ptr = data_ + bytes_offset;
const int64_t origin_data_size = out->builder->value_data_length();
RETURN_NOT_OK(out->builder->AppendBinaryWithLengths(
reinterpret_cast<const char*>(data_ptr), length_ptr, null_count, null_count,
valid_bits, valid_bits_offset));
if (ARROW_PREDICT_FALSE(!decoder_->Advance(
8 * static_cast<int64_t>(out->builder->value_data_length() -
origin_data_size)))) {
ParquetException::EofException();
}
length_idx_ += max_values;
this->num_values_ -= max_values;
num_valid_values_ -= max_values;
*out_num_values = num_values - null_count;
return Status::OK();
}

Status DecodeArrowDense(int num_values, int null_count, const uint8_t* valid_bits,
int64_t valid_bits_offset,
typename EncodingTraits<ByteArrayType>::Accumulator* out,
int* out_num_values) {
ArrowBinaryHelper<ByteArrayType> helper(out, num_values);
RETURN_NOT_OK(helper.Prepare());

if (helper.CanFit(decoder_->bytes_left())) {
return DecodeArrowDenseFastPath(num_values, null_count, valid_bits,
valid_bits_offset, out, out_num_values);
}

std::vector<ByteArray> values(num_values - null_count);
const int num_valid_values = Decode(values.data(), num_values - null_count);
if (ARROW_PREDICT_FALSE(num_values - null_count != num_valid_values)) {
Expand Down
26 changes: 26 additions & 0 deletions cpp/src/parquet/encoding_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1267,6 +1267,27 @@ class BM_ArrowBinaryPlain : public BenchmarkDecodeArrowByteArray {
}
};

class BM_ArrowBinaryDeltaLength : public BenchmarkDecodeArrowByteArray {
public:
void DoEncodeArrow() override {
auto encoder = MakeTypedEncoder<ByteArrayType>(Encoding::DELTA_LENGTH_BYTE_ARRAY);
encoder->Put(*input_array_);
buffer_ = encoder->FlushValues();
}

void DoEncodeLowLevel() override {
auto encoder = MakeTypedEncoder<ByteArrayType>(Encoding::DELTA_LENGTH_BYTE_ARRAY);
encoder->Put(values_.data(), num_values_);
buffer_ = encoder->FlushValues();
}

std::unique_ptr<ByteArrayDecoder> InitializeDecoder() override {
auto decoder = MakeTypedDecoder<ByteArrayType>(Encoding::DELTA_LENGTH_BYTE_ARRAY);
decoder->SetData(num_values_, buffer_->data(), static_cast<int>(buffer_->size()));
return decoder;
}
};

BENCHMARK_DEFINE_F(BM_ArrowBinaryPlain, EncodeArrow)
(benchmark::State& state) { EncodeArrowBenchmark(state); }
BENCHMARK_REGISTER_F(BM_ArrowBinaryPlain, EncodeArrow)->Range(1 << 18, 1 << 20);
Expand All @@ -1284,6 +1305,11 @@ BENCHMARK_DEFINE_F(BM_ArrowBinaryPlain, DecodeArrowNonNull_Dense)
BENCHMARK_REGISTER_F(BM_ArrowBinaryPlain, DecodeArrowNonNull_Dense)
->Range(MIN_RANGE, MAX_RANGE);

BENCHMARK_DEFINE_F(BM_ArrowBinaryDeltaLength, DL_DecodeArrow_Dense)
(benchmark::State& state) { DecodeArrowDenseBenchmark(state); }
BENCHMARK_REGISTER_F(BM_ArrowBinaryDeltaLength, DL_DecodeArrow_Dense)
->Range(MIN_RANGE, MAX_RANGE);

BENCHMARK_DEFINE_F(BM_ArrowBinaryPlain, DecodeArrow_Dict)
(benchmark::State& state) { DecodeArrowDictBenchmark(state); }
BENCHMARK_REGISTER_F(BM_ArrowBinaryPlain, DecodeArrow_Dict)->Range(MIN_RANGE, MAX_RANGE);
Expand Down
Loading