From 058527dbc205e595d0517c80ffdc4cebe09559ac Mon Sep 17 00:00:00 2001 From: MrPresent-Han Date: Wed, 8 Jan 2025 08:48:14 -0500 Subject: [PATCH] enhance: refine variable-length-type memory usage(#38736) Signed-off-by: MrPresent-Han --- internal/core/src/common/Chunk.cpp | 68 +++++++------ internal/core/src/common/Chunk.h | 87 +++++++++++++---- internal/core/src/common/ChunkTarget.h | 6 ++ internal/core/src/common/ChunkWriter.cpp | 95 ++++++++----------- internal/core/src/common/ChunkWriter.h | 16 ++++ internal/core/src/common/Common.h | 2 +- internal/core/src/exec/expression/Expr.h | 3 +- .../core/src/exec/expression/TermExpr.cpp | 2 +- internal/core/src/mmap/ChunkedColumn.h | 27 +++++- internal/core/src/mmap/Column.h | 12 +++ .../src/segcore/ChunkedSegmentSealedImpl.cpp | 49 +++++----- .../src/segcore/ChunkedSegmentSealedImpl.h | 11 ++- .../core/src/segcore/SegmentGrowingImpl.cpp | 18 +++- .../core/src/segcore/SegmentGrowingImpl.h | 11 ++- internal/core/src/segcore/SegmentInterface.h | 62 ++++++------ .../core/src/segcore/SegmentSealedImpl.cpp | 25 ++++- internal/core/src/segcore/SegmentSealedImpl.h | 11 ++- internal/core/src/storage/PayloadReader.cpp | 3 +- internal/core/unittest/test_chunk.cpp | 10 +- .../core/unittest/test_chunked_segment.cpp | 5 +- 20 files changed, 335 insertions(+), 188 deletions(-) diff --git a/internal/core/src/common/Chunk.cpp b/internal/core/src/common/Chunk.cpp index ca912e128c0dd..37c9416eb6f61 100644 --- a/internal/core/src/common/Chunk.cpp +++ b/internal/core/src/common/Chunk.cpp @@ -19,13 +19,42 @@ namespace milvus { std::pair, FixedVector> -StringChunk::StringViews() { +StringChunk::StringViews( + std::optional> offset_len = std::nullopt) { + auto start_offset = 0; + auto len = row_nums_; + if (offset_len.has_value()) { + start_offset = offset_len->first; + len = offset_len->second; + AssertInfo( + start_offset >= 0 && start_offset < row_nums_, + "Retrieve string views with out-of-bound offset:{}, len:{}, wrong", + start_offset, + len); + AssertInfo( + len > 0 && len <= row_nums_, + "Retrieve string views with out-of-bound offset:{}, len:{}, wrong", + start_offset, + len); + AssertInfo( + start_offset + len <= row_nums_, + "Retrieve string views with out-of-bound offset:{}, len:{}, wrong", + start_offset, + len); + } + std::vector ret; - ret.reserve(row_nums_); - for (int i = 0; i < row_nums_; i++) { + ret.reserve(len); + auto end_offset = start_offset + len; + for (auto i = start_offset; i < end_offset; i++) { ret.emplace_back(data_ + offsets_[i], offsets_[i + 1] - offsets_[i]); } - return {ret, valid_}; + if (nullable_) { + FixedVector res_valid(valid_.begin() + start_offset, + valid_.begin() + end_offset); + return {ret, std::move(res_valid)}; + } + return {ret, {}}; } std::pair, FixedVector> @@ -43,35 +72,4 @@ StringChunk::ViewsByOffsets(const FixedVector& offsets) { return {ret, valid_res}; } -void -ArrayChunk::ConstructViews() { - views_.reserve(row_nums_); - - for (int i = 0; i < row_nums_; ++i) { - int offset = offsets_lens_[2 * i]; - int next_offset = offsets_lens_[2 * (i + 1)]; - int len = offsets_lens_[2 * i + 1]; - auto data_ptr = data_ + offset; - auto offsets_bytes_len = 0; - uint32_t* offsets_ptr = nullptr; - if (IsStringDataType(element_type_)) { - offsets_bytes_len = len * sizeof(uint32_t); - offsets_ptr = reinterpret_cast(data_ptr); - } - views_.emplace_back(data_ptr + offsets_bytes_len, - len, - next_offset - offset - offsets_bytes_len, - element_type_, - offsets_ptr); - } -} - -SpanBase -ArrayChunk::Span() const { - return SpanBase(views_.data(), - nullable_ ? valid_.data() : nullptr, - views_.size(), - sizeof(ArrayView)); -} - } // namespace milvus diff --git a/internal/core/src/common/Chunk.h b/internal/core/src/common/Chunk.h index 8dd2e14039267..94c87ee36f8d1 100644 --- a/internal/core/src/common/Chunk.h +++ b/internal/core/src/common/Chunk.h @@ -29,6 +29,7 @@ #include "simdjson/common_defs.h" #include "sys/mman.h" #include "common/Types.h" + namespace milvus { constexpr uint64_t MMAP_STRING_PADDING = 1; constexpr uint64_t MMAP_ARRAY_PADDING = 1; @@ -132,8 +133,11 @@ class StringChunk : public Chunk { StringChunk() = default; StringChunk(int32_t row_nums, char* data, uint64_t size, bool nullable) : Chunk(row_nums, data, size, nullable) { - auto null_bitmap_bytes_num = (row_nums + 7) / 8; - offsets_ = reinterpret_cast(data + null_bitmap_bytes_num); + auto null_bitmap_bytes_num = 0; + if (nullable) { + null_bitmap_bytes_num = (row_nums + 7) / 8; + } + offsets_ = reinterpret_cast(data + null_bitmap_bytes_num); } std::string_view @@ -146,7 +150,7 @@ class StringChunk : public Chunk { } std::pair, FixedVector> - StringViews(); + StringViews(std::optional> offset_len); int binary_search_string(std::string_view target) { @@ -181,13 +185,13 @@ class StringChunk : public Chunk { return (*this)[idx].data(); } - uint64_t* + uint32_t* Offsets() { return offsets_; } protected: - uint64_t* offsets_; + uint32_t* offsets_; }; using JSONChunk = StringChunk; @@ -200,22 +204,72 @@ class ArrayChunk : public Chunk { milvus::DataType element_type, bool nullable) : Chunk(row_nums, data, size, nullable), element_type_(element_type) { - auto null_bitmap_bytes_num = (row_nums + 7) / 8; + auto null_bitmap_bytes_num = 0; + if (nullable) { + null_bitmap_bytes_num = (row_nums + 7) / 8; + } offsets_lens_ = - reinterpret_cast(data + null_bitmap_bytes_num); - ConstructViews(); + reinterpret_cast(data + null_bitmap_bytes_num); } - SpanBase - Span() const; - ArrayView - View(int64_t idx) const { - return views_[idx]; + View(int idx) const { + int idx_off = 2 * idx; + auto offset = offsets_lens_[idx_off]; + auto len = offsets_lens_[idx_off + 1]; + auto next_offset = offsets_lens_[idx_off + 2]; + auto data_ptr = data_ + offset; + uint32_t offsets_bytes_len = 0; + uint32_t* offsets_ptr = nullptr; + if (IsStringDataType(element_type_)) { + offsets_bytes_len = len * sizeof(uint32_t); + offsets_ptr = reinterpret_cast(data_ptr); + } + + return ArrayView(data_ptr + offsets_bytes_len, + len, + next_offset - offset - offsets_bytes_len, + element_type_, + offsets_ptr); } - void - ConstructViews(); + std::pair, FixedVector> + Views(std::optional> offset_len = + std::nullopt) const { + auto start_offset = 0; + auto len = row_nums_; + if (offset_len.has_value()) { + start_offset = offset_len->first; + len = offset_len->second; + AssertInfo(start_offset >= 0 && start_offset < row_nums_, + "Retrieve array views with out-of-bound offset:{}, " + "len:{}, wrong", + start_offset, + len); + AssertInfo(len > 0 && len <= row_nums_, + "Retrieve array views with out-of-bound offset:{}, " + "len:{}, wrong", + start_offset, + len); + AssertInfo(start_offset + len <= row_nums_, + "Retrieve array views with out-of-bound offset:{}, " + "len:{}, wrong", + start_offset, + len); + } + std::vector views; + views.reserve(len); + auto end_offset = start_offset + len; + for (auto i = start_offset; i < end_offset; i++) { + views.emplace_back(View(i)); + } + if (nullable_) { + FixedVector res_valid(valid_.begin() + start_offset, + valid_.begin() + end_offset); + return {std::move(views), std::move(res_valid)}; + } + return {std::move(views), {}}; + } const char* ValueAt(int64_t idx) const override { @@ -225,8 +279,7 @@ class ArrayChunk : public Chunk { private: milvus::DataType element_type_; - uint64_t* offsets_lens_; - std::vector views_; + uint32_t* offsets_lens_; }; class SparseFloatVectorChunk : public Chunk { diff --git a/internal/core/src/common/ChunkTarget.h b/internal/core/src/common/ChunkTarget.h index 91b0655c63373..ee441e1523c18 100644 --- a/internal/core/src/common/ChunkTarget.h +++ b/internal/core/src/common/ChunkTarget.h @@ -56,6 +56,12 @@ class MmapChunkTarget : public ChunkTarget { clear() { pos = 0; } + + void + write(uint32_t value) { + *reinterpret_cast(buf + pos) = value; + pos += sizeof(uint32_t); + } }; public: diff --git a/internal/core/src/common/ChunkWriter.cpp b/internal/core/src/common/ChunkWriter.cpp index 1d05b8ddcffa5..bf70cd81f425a 100644 --- a/internal/core/src/common/ChunkWriter.cpp +++ b/internal/core/src/common/ChunkWriter.cpp @@ -29,8 +29,9 @@ namespace milvus { void StringChunkWriter::write(std::shared_ptr data) { auto size = 0; - std::vector strs; + std::vector strs; std::vector> null_bitmaps; + auto do_iteration_duration = 0; for (auto batch : *data) { auto data = batch.ValueOrDie()->column(0); auto array = std::dynamic_pointer_cast(data); @@ -39,12 +40,15 @@ StringChunkWriter::write(std::shared_ptr data) { strs.emplace_back(str); size += str.size(); } - auto null_bitmap_n = (data->length() + 7) / 8; - null_bitmaps.emplace_back(data->null_bitmap_data(), null_bitmap_n); - size += null_bitmap_n; + if (nullable_) { + auto null_bitmap_n = (data->length() + 7) / 8; + null_bitmaps.emplace_back(data->null_bitmap_data(), null_bitmap_n); + size += null_bitmap_n; + } row_nums_ += array->length(); } - size += sizeof(uint64_t) * (row_nums_ + 1) + MMAP_STRING_PADDING; + + size += sizeof(uint32_t) * (row_nums_ + 1) + MMAP_STRING_PADDING; if (file_) { target_ = std::make_shared(*file_, file_offset_); } else { @@ -53,19 +57,12 @@ StringChunkWriter::write(std::shared_ptr data) { // chunk layout: null bitmap, offset1, offset2, ..., offsetn, str1, str2, ..., strn, padding // write null bitmaps - for (auto [data, size] : null_bitmaps) { - if (data == nullptr) { - std::vector null_bitmap(size, 0xff); - target_->write(null_bitmap.data(), size); - } else { - target_->write(data, size); - } - } + write_null_bit_maps(null_bitmaps); // write data int offset_num = row_nums_ + 1; - int offset_start_pos = target_->tell() + sizeof(uint64_t) * offset_num; - std::vector offsets; + uint32_t offset_start_pos = target_->tell() + sizeof(uint32_t) * offset_num; + std::vector offsets; for (const auto& str : strs) { offsets.push_back(offset_start_pos); @@ -73,8 +70,7 @@ StringChunkWriter::write(std::shared_ptr data) { } offsets.push_back(offset_start_pos); - target_->write(offsets.data(), offsets.size() * sizeof(uint64_t)); - + target_->write(offsets.data(), offsets.size() * sizeof(uint32_t)); for (auto str : strs) { target_->write(str.data(), str.size()); } @@ -93,7 +89,6 @@ StringChunkWriter::finish() { void JSONChunkWriter::write(std::shared_ptr data) { auto size = 0; - std::vector jsons; std::vector> null_bitmaps; for (auto batch : *data) { @@ -105,14 +100,14 @@ JSONChunkWriter::write(std::shared_ptr data) { size += json.data().size(); jsons.push_back(std::move(json)); } - // AssertInfo(data->length() % 8 == 0, - // "String length should be multiple of 8"); - auto null_bitmap_n = (data->length() + 7) / 8; - null_bitmaps.emplace_back(data->null_bitmap_data(), null_bitmap_n); - size += null_bitmap_n; + if (nullable_) { + auto null_bitmap_n = (data->length() + 7) / 8; + null_bitmaps.emplace_back(data->null_bitmap_data(), null_bitmap_n); + size += null_bitmap_n; + } row_nums_ += array->length(); } - size += sizeof(uint64_t) * (row_nums_ + 1) + simdjson::SIMDJSON_PADDING; + size += sizeof(uint32_t) * (row_nums_ + 1) + simdjson::SIMDJSON_PADDING; if (file_) { target_ = std::make_shared(*file_, file_offset_); } else { @@ -121,26 +116,19 @@ JSONChunkWriter::write(std::shared_ptr data) { // chunk layout: null bitmaps, offset1, offset2, ... ,json1, json2, ..., jsonn // write null bitmaps - for (auto [data, size] : null_bitmaps) { - if (data == nullptr) { - std::vector null_bitmap(size, 0xff); - target_->write(null_bitmap.data(), size); - } else { - target_->write(data, size); - } - } + write_null_bit_maps(null_bitmaps); int offset_num = row_nums_ + 1; - int offset_start_pos = target_->tell() + sizeof(uint64_t) * offset_num; - std::vector offsets; - + uint32_t offset_start_pos = target_->tell() + sizeof(uint32_t) * offset_num; + std::vector offsets; + offsets.reserve(offset_num); for (const auto& json : jsons) { offsets.push_back(offset_start_pos); offset_start_pos += json.data().size(); } offsets.push_back(offset_start_pos); - target_->write(offsets.data(), offsets.size() * sizeof(uint64_t)); + target_->write(offsets.data(), offset_num * sizeof(uint32_t)); // write data for (const auto& json : jsons) { @@ -160,10 +148,10 @@ JSONChunkWriter::finish() { void ArrayChunkWriter::write(std::shared_ptr data) { auto size = 0; - auto is_string = IsStringDataType(element_type_); std::vector arrays; std::vector> null_bitmaps; + for (auto batch : *data) { auto data = batch.ValueOrDie()->column(0); auto array = std::dynamic_pointer_cast(data); @@ -180,13 +168,15 @@ ArrayChunkWriter::write(std::shared_ptr data) { } } row_nums_ += array->length(); - auto null_bitmap_n = (data->length() + 7) / 8; - null_bitmaps.emplace_back(data->null_bitmap_data(), null_bitmap_n); - size += null_bitmap_n; + if (nullable_) { + auto null_bitmap_n = (data->length() + 7) / 8; + null_bitmaps.emplace_back(data->null_bitmap_data(), null_bitmap_n); + size += null_bitmap_n; + } } // offsets + lens - size += sizeof(uint64_t) * (row_nums_ * 2 + 1) + MMAP_ARRAY_PADDING; + size += sizeof(uint32_t) * (row_nums_ * 2 + 1) + MMAP_ARRAY_PADDING; if (file_) { target_ = std::make_shared(*file_, file_offset_); } else { @@ -194,21 +184,14 @@ ArrayChunkWriter::write(std::shared_ptr data) { } // chunk layout: nullbitmaps, offsets, elem_off1, elem_off2, .. data1, data2, ..., datan, padding - for (auto [data, size] : null_bitmaps) { - if (data == nullptr) { - std::vector null_bitmap(size, 0xff); - target_->write(null_bitmap.data(), size); - } else { - target_->write(data, size); - } - } + write_null_bit_maps(null_bitmaps); int offsets_num = row_nums_ + 1; int len_num = row_nums_; - uint64_t offset_start_pos = - target_->tell() + sizeof(uint64_t) * (offsets_num + len_num); - std::vector offsets(offsets_num); - std::vector lens(len_num); + uint32_t offset_start_pos = + target_->tell() + sizeof(uint32_t) * (offsets_num + len_num); + std::vector offsets(offsets_num); + std::vector lens(len_num); for (auto i = 0; i < arrays.size(); i++) { auto& arr = arrays[i]; offsets[i] = offset_start_pos; @@ -222,11 +205,11 @@ ArrayChunkWriter::write(std::shared_ptr data) { for (int i = 0; i < offsets.size(); i++) { if (i == offsets.size() - 1) { - target_->write(&offsets[i], sizeof(uint64_t)); + target_->write(&offsets[i], sizeof(uint32_t)); break; } - target_->write(&offsets[i], sizeof(uint64_t)); - target_->write(&lens[i], sizeof(uint64_t)); + target_->write(&offsets[i], sizeof(uint32_t)); + target_->write(&lens[i], sizeof(uint32_t)); } for (auto& arr : arrays) { diff --git a/internal/core/src/common/ChunkWriter.h b/internal/core/src/common/ChunkWriter.h index ab3769f39a51a..5fe4c0ddf0ae5 100644 --- a/internal/core/src/common/ChunkWriter.h +++ b/internal/core/src/common/ChunkWriter.h @@ -43,6 +43,22 @@ class ChunkWriterBase { return target_->get(); } + void + write_null_bit_maps( + const std::vector>& null_bitmaps) { + if (nullable_) { + for (auto [data, size] : null_bitmaps) { + if (data != nullptr) { + target_->write(data, size); + } else { + // have to append always-true bitmap due to arrow optimize this + std::vector null_bitmap(size, 0xff); + target_->write(null_bitmap.data(), size); + } + } + } + } + protected: int row_nums_ = 0; File* file_ = nullptr; diff --git a/internal/core/src/common/Common.h b/internal/core/src/common/Common.h index 49fcbcb7c8592..a691cf2f03d9f 100644 --- a/internal/core/src/common/Common.h +++ b/internal/core/src/common/Common.h @@ -51,7 +51,7 @@ SetDefaultExecEvalExprBatchSize(int64_t val); struct BufferView { struct Element { const char* data_; - uint64_t* offsets_; + uint32_t* offsets_; int start_; int end_; }; diff --git a/internal/core/src/exec/expression/Expr.h b/internal/core/src/exec/expression/Expr.h index f2bbc8cd7f6bb..ab94b42d22abf 100644 --- a/internal/core/src/exec/expression/Expr.h +++ b/internal/core/src/exec/expression/Expr.h @@ -647,7 +647,8 @@ class SegmentExpr : public Expr { if (!skip_func || !skip_func(skip_index, field_id_, i)) { bool is_seal = false; if constexpr (std::is_same_v || - std::is_same_v) { + std::is_same_v || + std::is_same_v) { if (segment_->type() == SegmentType::Sealed) { // first is the raw data, second is valid_data // use valid_data to see if raw data is null diff --git a/internal/core/src/exec/expression/TermExpr.cpp b/internal/core/src/exec/expression/TermExpr.cpp index 960d9731c9604..27df2a194b2d3 100644 --- a/internal/core/src/exec/expression/TermExpr.cpp +++ b/internal/core/src/exec/expression/TermExpr.cpp @@ -270,7 +270,7 @@ PhyTermFilterExpr::ExecTermArrayVariableInField(OffsetVector* input) { valid_res.set(); AssertInfo(expr_->vals_.size() == 1, - "element length in json array must be one"); + "element length in array array must be one"); ValueType target_val = GetValueFromProto(expr_->vals_[0]); auto execute_sub_batch = diff --git a/internal/core/src/mmap/ChunkedColumn.h b/internal/core/src/mmap/ChunkedColumn.h index 2195fadf20aaa..0136231b903f7 100644 --- a/internal/core/src/mmap/ChunkedColumn.h +++ b/internal/core/src/mmap/ChunkedColumn.h @@ -150,11 +150,19 @@ class ChunkedColumnBase : public ColumnBase { } virtual std::pair, FixedVector> - StringViews(int64_t chunk_id) const { + StringViews(int64_t chunk_id, + std::optional> offset_len) const { PanicInfo(ErrorCode::Unsupported, "StringViews only supported for VariableColumn"); } + virtual std::pair, FixedVector> + ArrayViews(int64_t chunk_id, + std::optional> offset_len) const { + PanicInfo(ErrorCode::Unsupported, + "ArrayViews only supported for ArrayChunkedColumn"); + } + virtual std::pair, FixedVector> ViewsByOffsets(int64_t chunk_id, const FixedVector& offsets) const { @@ -324,9 +332,11 @@ class ChunkedVariableColumn : public ChunkedColumnBase { } std::pair, FixedVector> - StringViews(int64_t chunk_id) const override { + StringViews(int64_t chunk_id, + std::optional> offset_len = + std::nullopt) const override { return std::dynamic_pointer_cast(chunks_[chunk_id]) - ->StringViews(); + ->StringViews(offset_len); } std::shared_ptr @@ -397,7 +407,8 @@ class ChunkedArrayColumn : public ChunkedColumnBase { SpanBase Span(int64_t chunk_id) const override { - return std::dynamic_pointer_cast(chunks_[chunk_id])->Span(); + PanicInfo(ErrorCode::NotImplemented, + "span() interface is not implemented for arr chunk column"); } ArrayView @@ -414,5 +425,13 @@ class ChunkedArrayColumn : public ChunkedColumnBase { ->View(offset_in_chunk) .output_data(); } + + std::pair, FixedVector> + ArrayViews(int64_t chunk_id, + std::optional> offset_len = + std::nullopt) const override { + return std::dynamic_pointer_cast(chunks_[chunk_id]) + ->Views(offset_len); + } }; } // namespace milvus \ No newline at end of file diff --git a/internal/core/src/mmap/Column.h b/internal/core/src/mmap/Column.h index d0e2ed3690f1e..d823b4dd9d10b 100644 --- a/internal/core/src/mmap/Column.h +++ b/internal/core/src/mmap/Column.h @@ -321,6 +321,12 @@ class SingleChunkColumnBase : public ColumnBase { "StringViews only supported for VariableColumn"); } + virtual std::pair, FixedVector> + ArrayViews() const { + PanicInfo(ErrorCode::Unsupported, + "ArrayView only supported for ArrayColumn"); + } + virtual std::pair, FixedVector> ViewsByOffsets(const FixedVector& offsets) const { PanicInfo(ErrorCode::Unsupported, @@ -952,6 +958,11 @@ class SingleChunkArrayColumn : public SingleChunkColumnBase { ConstructViews(); } + std::pair, FixedVector> + ArrayViews() const override { + return {Views(), valid_data_}; + } + protected: void ConstructViews() { @@ -970,6 +981,7 @@ class SingleChunkArrayColumn : public SingleChunkColumnBase { element_type_, element_indices_[last].data()); lens_.clear(); + indices_.clear(); } private: diff --git a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp index 39d4cf9fb02f9..3bb221b486c92 100644 --- a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp +++ b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp @@ -405,28 +405,9 @@ ChunkedSegmentSealedImpl::LoadFieldData(FieldId field_id, FieldDataInfo& data) { std::make_shared(field_meta); std::shared_ptr r; while (data.arrow_reader_channel->pop(r)) { - // for (auto i = 0; i < field_data->get_num_rows(); i++) { - // auto rawValue = field_data->RawValue(i); - // auto array = - // static_cast(rawValue); - // if (field_data->IsNullable()) { - // var_column->Append(*array, - // field_data->is_valid(i)); - // } else { - // var_column->Append(*array); - // } - - // // we stores the offset for each array element, so there is a additional uint64_t for each array element - // field_data_size = - // array->byte_size() + sizeof(uint64_t); - // stats_.mem_size += - // array->byte_size() + sizeof(uint64_t); - // } - auto chunk = create_chunk(field_meta, 1, r->reader); var_column->AddChunk(chunk); } - // var_column->Seal(); column = std::move(var_column); break; } @@ -780,24 +761,42 @@ ChunkedSegmentSealedImpl::chunk_data_impl(FieldId field_id, return field_data->Span(chunk_id); } auto field_data = insert_record_.get_data_base(field_id); - AssertInfo(field_data->num_chunk() == 1, - "num chunk not equal to 1 for sealed segment"); // system field return field_data->get_span_base(0); } +std::pair, FixedVector> +ChunkedSegmentSealedImpl::chunk_array_view_impl( + FieldId field_id, + int64_t chunk_id, + std::optional> offset_len = + std::nullopt) const { + std::shared_lock lck(mutex_); + AssertInfo(get_bit(field_data_ready_bitset_, field_id), + "Can't get bitset element at " + std::to_string(field_id.get())); + if (auto it = fields_.find(field_id); it != fields_.end()) { + auto& field_data = it->second; + return field_data->ArrayViews(chunk_id, offset_len); + } + PanicInfo(ErrorCode::UnexpectedError, + "chunk_array_view_impl only used for chunk column field "); +} + std::pair, FixedVector> -ChunkedSegmentSealedImpl::chunk_view_impl(FieldId field_id, - int64_t chunk_id) const { +ChunkedSegmentSealedImpl::chunk_string_view_impl( + FieldId field_id, + int64_t chunk_id, + std::optional> offset_len = + std::nullopt) const { std::shared_lock lck(mutex_); AssertInfo(get_bit(field_data_ready_bitset_, field_id), "Can't get bitset element at " + std::to_string(field_id.get())); if (auto it = fields_.find(field_id); it != fields_.end()) { auto& field_data = it->second; - return field_data->StringViews(chunk_id); + return field_data->StringViews(chunk_id, offset_len); } PanicInfo(ErrorCode::UnexpectedError, - "chunk_view_impl only used for variable column field "); + "chunk_string_view_impl only used for variable column field "); } std::pair, FixedVector> diff --git a/internal/core/src/segcore/ChunkedSegmentSealedImpl.h b/internal/core/src/segcore/ChunkedSegmentSealedImpl.h index 7f53f9d94a09d..b9743ea8c6d4c 100644 --- a/internal/core/src/segcore/ChunkedSegmentSealedImpl.h +++ b/internal/core/src/segcore/ChunkedSegmentSealedImpl.h @@ -209,7 +209,16 @@ class ChunkedSegmentSealedImpl : public SegmentSealed { chunk_data_impl(FieldId field_id, int64_t chunk_id) const override; std::pair, FixedVector> - chunk_view_impl(FieldId field_id, int64_t chunk_id) const override; + chunk_string_view_impl( + FieldId field_id, + int64_t chunk_id, + std::optional> offset_len) const override; + + std::pair, FixedVector> + chunk_array_view_impl( + FieldId field_id, + int64_t chunk_id, + std::optional> offset_len) const override; std::pair, FixedVector> chunk_view_by_offsets(FieldId field_id, diff --git a/internal/core/src/segcore/SegmentGrowingImpl.cpp b/internal/core/src/segcore/SegmentGrowingImpl.cpp index c4715f931f5c2..55eac27a0cb89 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.cpp +++ b/internal/core/src/segcore/SegmentGrowingImpl.cpp @@ -368,9 +368,23 @@ SegmentGrowingImpl::chunk_data_impl(FieldId field_id, int64_t chunk_id) const { } std::pair, FixedVector> -SegmentGrowingImpl::chunk_view_impl(FieldId field_id, int64_t chunk_id) const { +SegmentGrowingImpl::chunk_string_view_impl( + FieldId field_id, + int64_t chunk_id, + std::optional> offset_len = + std::nullopt) const { + PanicInfo(ErrorCode::NotImplemented, + "chunk string view impl not implement for growing segment"); +} + +std::pair, FixedVector> +SegmentGrowingImpl::chunk_array_view_impl( + FieldId field_id, + int64_t chunk_id, + std::optional> offset_len = + std::nullopt) const { PanicInfo(ErrorCode::NotImplemented, - "chunk view impl not implement for growing segment"); + "chunk array view impl not implement for growing segment"); } std::pair, FixedVector> diff --git a/internal/core/src/segcore/SegmentGrowingImpl.h b/internal/core/src/segcore/SegmentGrowingImpl.h index 23fc4ff60f184..2ed860bb117b0 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.h +++ b/internal/core/src/segcore/SegmentGrowingImpl.h @@ -348,7 +348,16 @@ class SegmentGrowingImpl : public SegmentGrowing { chunk_data_impl(FieldId field_id, int64_t chunk_id) const override; std::pair, FixedVector> - chunk_view_impl(FieldId field_id, int64_t chunk_id) const override; + chunk_string_view_impl( + FieldId field_id, + int64_t chunk_id, + std::optional> offset_len) const override; + + std::pair, FixedVector> + chunk_array_view_impl( + FieldId field_id, + int64_t chunk_id, + std::optional> offset_len) const override; std::pair, FixedVector> chunk_view_by_offsets(FieldId field_id, diff --git a/internal/core/src/segcore/SegmentInterface.h b/internal/core/src/segcore/SegmentInterface.h index 90e34ce78a7b2..48550fd6dfd00 100644 --- a/internal/core/src/segcore/SegmentInterface.h +++ b/internal/core/src/segcore/SegmentInterface.h @@ -147,18 +147,28 @@ class SegmentInternalInterface : public SegmentInterface { template std::pair, FixedVector> - chunk_view(FieldId field_id, int64_t chunk_id) const { - auto [string_views, valid_data] = chunk_view_impl(field_id, chunk_id); + chunk_view(FieldId field_id, + int64_t chunk_id, + std::optional> offset_len = + std::nullopt) const { if constexpr (std::is_same_v) { + auto [string_views, valid_data] = + chunk_string_view_impl(field_id, chunk_id, offset_len); return std::make_pair(std::move(string_views), std::move(valid_data)); - } else { - std::vector res; + } else if constexpr (std::is_same_v) { + auto [array_views, valid_data] = + chunk_array_view_impl(field_id, chunk_id, offset_len); + return std::make_pair(array_views, valid_data); + } else if constexpr (std::is_same_v) { + auto [string_views, valid_data] = + chunk_string_view_impl(field_id, chunk_id, offset_len); + std::vector res; res.reserve(string_views.size()); - for (const auto& view : string_views) { - res.emplace_back(view); + for (const auto& str_view : string_views) { + res.emplace_back(str_view); } - return std::make_pair(res, valid_data); + return {std::move(res), std::move(valid_data)}; } } @@ -172,31 +182,8 @@ class SegmentInternalInterface : public SegmentInterface { PanicInfo(ErrorCode::Unsupported, "get chunk views not supported for growing segment"); } - auto chunk_info = - get_chunk_buffer(field_id, chunk_id, start_offset, length); - BufferView buffer = chunk_info.first; - std::vector res; - res.reserve(length); - if (buffer.data_.index() == 1) { - char* pos = std::get<1>(buffer.data_).first; - for (size_t j = 0; j < length; j++) { - uint32_t size; - size = *reinterpret_cast(pos); - pos += sizeof(uint32_t); - res.emplace_back(ViewType(pos, size)); - pos += size; - } - } else { - auto elements = std::get<0>(buffer.data_); - for (auto& element : elements) { - for (int i = element.start_; i < element.end_; i++) { - res.emplace_back(ViewType( - element.data_ + element.offsets_[i], - element.offsets_[i + 1] - element.offsets_[i])); - } - } - } - return std::make_pair(res, chunk_info.second); + return chunk_view( + field_id, chunk_id, std::make_pair(start_offset, length)); } template @@ -426,7 +413,16 @@ class SegmentInternalInterface : public SegmentInterface { // internal API: return chunk string views in vector virtual std::pair, FixedVector> - chunk_view_impl(FieldId field_id, int64_t chunk_id) const = 0; + chunk_string_view_impl(FieldId field_id, + int64_t chunk_id, + std::optional> + offset_len = std::nullopt) const = 0; + + virtual std::pair, FixedVector> + chunk_array_view_impl(FieldId field_id, + int64_t chunk_id, + std::optional> + offset_len = std::nullopt) const = 0; // internal API: return buffer reference to field chunk data located from start_offset virtual std::pair> diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index 15e22dff935c9..991c3f7420710 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -759,7 +759,11 @@ SegmentSealedImpl::chunk_data_impl(FieldId field_id, int64_t chunk_id) const { } std::pair, FixedVector> -SegmentSealedImpl::chunk_view_impl(FieldId field_id, int64_t chunk_id) const { +SegmentSealedImpl::chunk_string_view_impl( + FieldId field_id, + int64_t chunk_id, + std::optional> offset_len = + std::nullopt) const { std::shared_lock lck(mutex_); AssertInfo(get_bit(field_data_ready_bitset_, field_id), "Can't get bitset element at " + std::to_string(field_id.get())); @@ -768,7 +772,24 @@ SegmentSealedImpl::chunk_view_impl(FieldId field_id, int64_t chunk_id) const { return field_data->StringViews(); } PanicInfo(ErrorCode::UnexpectedError, - "chunk_view_impl only used for variable column field "); + "chunk_string_view_impl only used for variable column field "); +} + +std::pair, FixedVector> +SegmentSealedImpl::chunk_array_view_impl( + FieldId field_id, + int64_t chunk_id, + std::optional> offset_len = + std::nullopt) const { + std::shared_lock lck(mutex_); + AssertInfo(get_bit(field_data_ready_bitset_, field_id), + "Can't get bitset element at " + std::to_string(field_id.get())); + if (auto it = fields_.find(field_id); it != fields_.end()) { + auto& field_data = it->second; + return field_data->ArrayViews(); + } + PanicInfo(ErrorCode::UnexpectedError, + "chunk_array_view_impl only used for array column field "); } std::pair, FixedVector> diff --git a/internal/core/src/segcore/SegmentSealedImpl.h b/internal/core/src/segcore/SegmentSealedImpl.h index 03a33d014c9e5..c504c4c4ce767 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.h +++ b/internal/core/src/segcore/SegmentSealedImpl.h @@ -222,7 +222,16 @@ class SegmentSealedImpl : public SegmentSealed { chunk_data_impl(FieldId field_id, int64_t chunk_id) const override; std::pair, FixedVector> - chunk_view_impl(FieldId field_id, int64_t chunk_id) const override; + chunk_string_view_impl( + FieldId field_id, + int64_t chunk_id, + std::optional> offset_len) const override; + + std::pair, FixedVector> + chunk_array_view_impl( + FieldId field_id, + int64_t chunk_id, + std::optional> offset_len) const override; std::pair, FixedVector> chunk_view_by_offsets(FieldId field_id, diff --git a/internal/core/src/storage/PayloadReader.cpp b/internal/core/src/storage/PayloadReader.cpp index 4d38ac69bfbe2..75afe7a9ccf30 100644 --- a/internal/core/src/storage/PayloadReader.cpp +++ b/internal/core/src/storage/PayloadReader.cpp @@ -69,13 +69,12 @@ PayloadReader::init(std::shared_ptr input, ? GetDimensionFromFileMetaData( file_meta->schema()->Column(column_index), column_type_) : 1; - auto total_num_rows = file_meta->num_rows(); - std::shared_ptr<::arrow::RecordBatchReader> rb_reader; st = arrow_reader->GetRecordBatchReader(&rb_reader); AssertInfo(st.ok(), "get record batch reader"); if (is_field_data) { + auto total_num_rows = file_meta->num_rows(); field_data_ = CreateFieldData(column_type_, nullable_, dim_, total_num_rows); for (arrow::Result> maybe_batch : diff --git a/internal/core/unittest/test_chunk.cpp b/internal/core/unittest/test_chunk.cpp index 1c1eb84ce4910..0ecb3a679c198 100644 --- a/internal/core/unittest/test_chunk.cpp +++ b/internal/core/unittest/test_chunk.cpp @@ -95,7 +95,8 @@ TEST(chunk, test_variable_field) { FieldMeta field_meta( FieldName("a"), milvus::FieldId(1), DataType::STRING, false); auto chunk = create_chunk(field_meta, 1, rb_reader); - auto views = std::dynamic_pointer_cast(chunk)->StringViews(); + auto views = std::dynamic_pointer_cast(chunk)->StringViews( + std::nullopt); for (size_t i = 0; i < data.size(); ++i) { EXPECT_EQ(views.first[i], data[i]); } @@ -177,9 +178,10 @@ TEST(chunk, test_array) { DataType::STRING, false); auto chunk = create_chunk(field_meta, 1, rb_reader); - auto span = std::dynamic_pointer_cast(chunk)->Span(); - EXPECT_EQ(span.row_count(), 1); - auto arr = *(ArrayView*)span.data(); + auto [views, valid] = + std::dynamic_pointer_cast(chunk)->Views(std::nullopt); + EXPECT_EQ(views.size(), 1); + auto& arr = views[0]; for (size_t i = 0; i < arr.length(); ++i) { auto str = arr.get_data(i); EXPECT_EQ(str, field_string_data.string_data().data(i)); diff --git a/internal/core/unittest/test_chunked_segment.cpp b/internal/core/unittest/test_chunked_segment.cpp index 55fe403f9bc56..fda85f019eaf4 100644 --- a/internal/core/unittest/test_chunked_segment.cpp +++ b/internal/core/unittest/test_chunked_segment.cpp @@ -225,6 +225,7 @@ class TestChunkSegment : public testing::TestWithParam { str_data.push_back("test" + std::to_string(i)); } std::sort(str_data.begin(), str_data.end()); + std::vector validity(test_data_count, true); // generate data for (int chunk_id = 0; chunk_id < chunk_num; @@ -233,8 +234,8 @@ class TestChunkSegment : public testing::TestWithParam { std::iota(test_data.begin(), test_data.end(), start_id); auto builder = std::make_shared(); - auto status = - builder->AppendValues(test_data.begin(), test_data.end()); + auto status = builder->AppendValues( + test_data.begin(), test_data.end(), validity.begin()); ASSERT_TRUE(status.ok()); auto res = builder->Finish(); ASSERT_TRUE(res.ok());