Skip to content

Commit

Permalink
fix: fix growing index data race and properly handle build error (#31170
Browse files Browse the repository at this point in the history
)

issue: #31169

also properly handling index build error by re-create a new index so
that nothing will be left in the previous failed index build attempt.

Signed-off-by: Buqian Zheng <[email protected]>
  • Loading branch information
zhengbuqian authored Mar 13, 2024
1 parent 57a5e44 commit 7fc3094
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 23 deletions.
8 changes: 8 additions & 0 deletions internal/core/src/segcore/FieldIndexing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ VectorFieldIndexing::VectorFieldIndexing(const FieldMeta& field_meta,
field_index_meta,
segcore_config,
SegmentType::Growing)) {
recreate_index();
}

void
VectorFieldIndexing::recreate_index() {
index_ = std::make_unique<index::VectorMemIndex<float>>(
config_->GetIndexType(),
config_->GetMetricType(),
Expand Down Expand Up @@ -128,6 +133,8 @@ VectorFieldIndexing::AppendSegmentIndexSparse(int64_t reserved_offset,
}
} catch (SegcoreError& error) {
LOG_ERROR("growing sparse index build error: {}", error.what());
recreate_index();
index_cur_ = 0;
return;
}
index_cur_.fetch_add(rows);
Expand Down Expand Up @@ -204,6 +211,7 @@ VectorFieldIndexing::AppendSegmentIndexDense(int64_t reserved_offset,
index_->BuildWithDataset(dataset, conf);
} catch (SegcoreError& error) {
LOG_ERROR("growing index build error: {}", error.what());
recreate_index();
return;
}
index_cur_.fetch_add(vec_num);
Expand Down
2 changes: 2 additions & 0 deletions internal/core/src/segcore/FieldIndexing.h
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,8 @@ class VectorFieldIndexing : public FieldIndexing {
get_search_params(const SearchInfo& searchInfo) const;

private:
void
recreate_index();
// current number of rows in index.
std::atomic<idx_t> index_cur_ = 0;
// whether the growing index has been built.
Expand Down
72 changes: 49 additions & 23 deletions internal/core/src/segcore/SegmentGrowingImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -627,22 +627,35 @@ SegmentGrowingImpl::bulk_subscript_sparse_float_vector_impl(
milvus::proto::schema::SparseFloatArray* output) const {
AssertInfo(HasRawData(field_id.get()), "Growing segment loss raw data");

// if index has finished building index, grab from index
// if index has finished building, grab from index without any
// synchronization operations.
if (indexing_record_.SyncDataWithIndex(field_id)) {
indexing_record_.GetDataFromIndex(
field_id, seg_offsets, count, 0, output);
return;
}
// else copy from raw data
std::lock_guard<std::shared_mutex> guard(chunk_mutex_);
SparseRowsToProto(
[&](size_t i) {
auto offset = seg_offsets[i];
return offset != INVALID_SEG_OFFSET ? vec_raw->get_element(offset)
: nullptr;
},
count,
output);
{
std::lock_guard<std::shared_mutex> guard(chunk_mutex_);
// check again after lock to make sure: if index has finished building
// after the above check but before we grabbed the lock, we should grab
// from index as the data in chunk may have been removed in
// try_remove_chunks.
if (!indexing_record_.SyncDataWithIndex(field_id)) {
// copy from raw data
SparseRowsToProto(
[&](size_t i) {
auto offset = seg_offsets[i];
return offset != INVALID_SEG_OFFSET
? vec_raw->get_element(offset)
: nullptr;
},
count,
output);
return;
}
// else: release lock and copy from index
}
indexing_record_.GetDataFromIndex(field_id, seg_offsets, count, 0, output);
}

template <typename S, typename T>
Expand Down Expand Up @@ -675,25 +688,38 @@ SegmentGrowingImpl::bulk_subscript_impl(FieldId field_id,

// HasRawData interface guarantees that data can be fetched from growing segment
AssertInfo(HasRawData(field_id.get()), "Growing segment loss raw data");
// when data is in sync with index

// if index has finished building, grab from index without any
// synchronization operations.
if (indexing_record_.SyncDataWithIndex(field_id)) {
indexing_record_.GetDataFromIndex(
field_id, seg_offsets, count, element_sizeof, output_raw);
return;
}
// else copy from chunk
std::lock_guard<std::shared_mutex> guard(chunk_mutex_);
auto output_base = reinterpret_cast<char*>(output_raw);
for (int i = 0; i < count; ++i) {
auto dst = output_base + i * element_sizeof;
auto offset = seg_offsets[i];
if (offset == INVALID_SEG_OFFSET) {
memset(dst, 0, element_sizeof);
} else {
auto src = (const uint8_t*)vec.get_element(offset);
memcpy(dst, src, element_sizeof);
{
std::lock_guard<std::shared_mutex> guard(chunk_mutex_);
// check again after lock to make sure: if index has finished building
// after the above check but before we grabbed the lock, we should grab
// from index as the data in chunk may have been removed in
// try_remove_chunks.
if (!indexing_record_.SyncDataWithIndex(field_id)) {
auto output_base = reinterpret_cast<char*>(output_raw);
for (int i = 0; i < count; ++i) {
auto dst = output_base + i * element_sizeof;
auto offset = seg_offsets[i];
if (offset == INVALID_SEG_OFFSET) {
memset(dst, 0, element_sizeof);
} else {
auto src = (const uint8_t*)vec.get_element(offset);
memcpy(dst, src, element_sizeof);
}
}
return;
}
// else: release lock and copy from index
}
indexing_record_.GetDataFromIndex(
field_id, seg_offsets, count, element_sizeof, output_raw);
}

template <typename S, typename T>
Expand Down

0 comments on commit 7fc3094

Please sign in to comment.