Skip to content

Commit

Permalink
enhance: support sparse cardinal hnsw index (milvus-io#33656)
Browse files Browse the repository at this point in the history
issue: milvus-io#29419

Signed-off-by: Buqian Zheng <[email protected]>
  • Loading branch information
zhengbuqian authored and yellow-shine committed Jul 2, 2024
1 parent ef8f229 commit dcc7b2c
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 26 deletions.
12 changes: 12 additions & 0 deletions internal/core/src/index/IndexFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,10 @@ IndexFactory::CreateVectorIndex(
return std::make_unique<VectorDiskAnnIndex<bin1>>(
index_type, metric_type, version, file_manager_context);
}
case DataType::VECTOR_SPARSE_FLOAT: {
return std::make_unique<VectorDiskAnnIndex<float>>(
index_type, metric_type, version, file_manager_context);
}
default:
throw SegcoreError(
DataTypeInvalid,
Expand Down Expand Up @@ -328,6 +332,14 @@ IndexFactory::CreateVectorIndex(
space,
file_manager_context);
}
case DataType::VECTOR_SPARSE_FLOAT: {
return std::make_unique<VectorDiskAnnIndex<float>>(
index_type,
metric_type,
version,
space,
file_manager_context);
}
default:
throw SegcoreError(
DataTypeInvalid,
Expand Down
10 changes: 6 additions & 4 deletions internal/core/src/segcore/FieldIndexing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@ VectorFieldIndexing::VectorFieldIndexing(const FieldMeta& field_meta,
: FieldIndexing(field_meta, segcore_config),
built_(false),
sync_with_index_(false),
config_(std::make_unique<VecIndexConfig>(segment_max_row_count,
field_index_meta,
segcore_config,
SegmentType::Growing)) {
config_(std::make_unique<VecIndexConfig>(
segment_max_row_count,
field_index_meta,
segcore_config,
SegmentType::Growing,
IsSparseFloatVectorDataType(field_meta.get_data_type()))) {
recreate_index();
}

Expand Down
15 changes: 10 additions & 5 deletions internal/core/src/segcore/IndexConfigGenerator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@ namespace milvus::segcore {
VecIndexConfig::VecIndexConfig(const int64_t max_index_row_cout,
const FieldIndexMeta& index_meta_,
const SegcoreConfig& config,
const SegmentType& segment_type)
: max_index_row_count_(max_index_row_cout), config_(config) {
const SegmentType& segment_type,
const bool is_sparse)
: max_index_row_count_(max_index_row_cout),
config_(config),
is_sparse_(is_sparse) {
origin_index_type_ = index_meta_.GetIndexType();
metric_type_ = index_meta_.GeMetricType();
// Currently for dense vector index, if the segment is growing, we use IVFCC
Expand All @@ -29,11 +32,15 @@ VecIndexConfig::VecIndexConfig(const int64_t max_index_row_cout,
// But for sparse vector index(INDEX_SPARSE_INVERTED_INDEX and
// INDEX_SPARSE_WAND), those index themselves can be used as the temp index
// type, so we can avoid the extra step of "releast temp and load".
// When using HNSW(cardinal) for sparse, we use INDEX_SPARSE_INVERTED_INDEX
// as the growing index.

if (origin_index_type_ ==
knowhere::IndexEnum::INDEX_SPARSE_INVERTED_INDEX ||
origin_index_type_ == knowhere::IndexEnum::INDEX_SPARSE_WAND) {
index_type_ = origin_index_type_;
} else if (is_sparse_) {
index_type_ = knowhere::IndexEnum::INDEX_SPARSE_INVERTED_INDEX;
} else {
index_type_ = support_index_types.at(segment_type);
}
Expand All @@ -58,9 +65,7 @@ VecIndexConfig::GetBuildThreshold() const noexcept {
// For sparse, do not impose a threshold and start using index with any
// number of rows. Unlike dense vector index, growing sparse vector index
// does not require a minimum number of rows to train.
if (origin_index_type_ ==
knowhere::IndexEnum::INDEX_SPARSE_INVERTED_INDEX ||
origin_index_type_ == knowhere::IndexEnum::INDEX_SPARSE_WAND) {
if (is_sparse_) {
return 0;
}
assert(VecIndexConfig::index_build_ratio.count(index_type_));
Expand Down
5 changes: 4 additions & 1 deletion internal/core/src/segcore/IndexConfigGenerator.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ class VecIndexConfig {
VecIndexConfig(const int64_t max_index_row_count,
const FieldIndexMeta& index_meta_,
const SegcoreConfig& config,
const SegmentType& segment_type);
const SegmentType& segment_type,
const bool is_sparse);

int64_t
GetBuildThreshold() const noexcept;
Expand Down Expand Up @@ -72,6 +73,8 @@ class VecIndexConfig {

knowhere::MetricType metric_type_;

bool is_sparse_;

knowhere::Json build_params_;

knowhere::Json search_params_;
Expand Down
3 changes: 2 additions & 1 deletion internal/core/src/segcore/SegmentSealedImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1593,7 +1593,8 @@ SegmentSealedImpl::generate_interim_index(const FieldId field_id) {
new VecIndexConfig(row_count,
field_index_meta,
segcore_config_,
SegmentType::Sealed));
SegmentType::Sealed,
is_sparse));
if (row_count < field_binlog_config->GetBuildThreshold()) {
return false;
}
Expand Down
71 changes: 56 additions & 15 deletions internal/core/src/storage/DiskFileManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -453,10 +453,18 @@ DiskFileManagerImpl::CacheRawDataToDisk(std::vector<std::string> remote_files) {

auto local_chunk_manager =
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
auto local_data_path = storage::GenFieldRawDataPathPrefix(
local_chunk_manager, segment_id, field_id) +
"raw_data";
local_chunk_manager->CreateFile(local_data_path);
std::string local_data_path;
bool file_created = false;

auto init_file_info = [&](milvus::DataType dt) {
local_data_path = storage::GenFieldRawDataPathPrefix(
local_chunk_manager, segment_id, field_id) +
"raw_data";
if (dt == milvus::DataType::VECTOR_SPARSE_FLOAT) {
local_data_path += ".sparse_u32_f32";
}
local_chunk_manager->CreateFile(local_data_path);
};

// get batch raw data from s3 and write batch data to disk file
// TODO: load and write of different batches at the same time
Expand All @@ -474,17 +482,50 @@ DiskFileManagerImpl::CacheRawDataToDisk(std::vector<std::string> remote_files) {
for (int i = 0; i < batch_size; ++i) {
auto field_data = field_datas[i].get()->GetFieldData();
num_rows += uint32_t(field_data->get_num_rows());
AssertInfo(dim == 0 || dim == field_data->get_dim(),
"inconsistent dim value in multi binlogs!");
dim = field_data->get_dim();

auto data_size =
field_data->get_num_rows() * dim * sizeof(DataType);
local_chunk_manager->Write(local_data_path,
write_offset,
const_cast<void*>(field_data->Data()),
data_size);
write_offset += data_size;
auto data_type = field_data->get_data_type();
if (!file_created) {
init_file_info(data_type);
file_created = true;
}
if (data_type == milvus::DataType::VECTOR_SPARSE_FLOAT) {
dim = std::max(
dim,
(uint32_t)(
std::dynamic_pointer_cast<FieldData<SparseFloatVector>>(
field_data)
->Dim()));
auto sparse_rows =
static_cast<const knowhere::sparse::SparseRow<float>*>(
field_data->Data());
for (size_t i = 0; i < field_data->Length(); ++i) {
auto row = sparse_rows[i];
auto row_byte_size = row.data_byte_size();
uint32_t nnz = row.size();
local_chunk_manager->Write(local_data_path,
write_offset,
const_cast<uint32_t*>(&nnz),
sizeof(nnz));
write_offset += sizeof(nnz);
local_chunk_manager->Write(local_data_path,
write_offset,
row.data(),
row_byte_size);
write_offset += row_byte_size;
}
} else {
AssertInfo(dim == 0 || dim == field_data->get_dim(),
"inconsistent dim value in multi binlogs!");
dim = field_data->get_dim();

auto data_size =
field_data->get_num_rows() * dim * sizeof(DataType);
local_chunk_manager->Write(
local_data_path,
write_offset,
const_cast<void*>(field_data->Data()),
data_size);
write_offset += data_size;
}
}
};

Expand Down

0 comments on commit dcc7b2c

Please sign in to comment.