diff --git a/internal/core/src/index/IndexFactory.cpp b/internal/core/src/index/IndexFactory.cpp index e56e5bffdee99..536e4cc40824e 100644 --- a/internal/core/src/index/IndexFactory.cpp +++ b/internal/core/src/index/IndexFactory.cpp @@ -201,6 +201,10 @@ IndexFactory::CreateVectorIndex( return std::make_unique>( index_type, metric_type, version, file_manager_context); } + case DataType::VECTOR_SPARSE_FLOAT: { + return std::make_unique>( + index_type, metric_type, version, file_manager_context); + } default: throw SegcoreError( DataTypeInvalid, @@ -323,6 +327,14 @@ IndexFactory::CreateVectorIndex( space, file_manager_context); } + case DataType::VECTOR_SPARSE_FLOAT: { + return std::make_unique>( + index_type, + metric_type, + version, + space, + file_manager_context); + } default: throw SegcoreError( DataTypeInvalid, diff --git a/internal/core/src/segcore/FieldIndexing.cpp b/internal/core/src/segcore/FieldIndexing.cpp index 641c89ff78549..478d958c48737 100644 --- a/internal/core/src/segcore/FieldIndexing.cpp +++ b/internal/core/src/segcore/FieldIndexing.cpp @@ -32,10 +32,12 @@ VectorFieldIndexing::VectorFieldIndexing(const FieldMeta& field_meta, : FieldIndexing(field_meta, segcore_config), built_(false), sync_with_index_(false), - config_(std::make_unique(segment_max_row_count, - field_index_meta, - segcore_config, - SegmentType::Growing)) { + config_(std::make_unique( + segment_max_row_count, + field_index_meta, + segcore_config, + SegmentType::Growing, + IsSparseFloatVectorDataType(field_meta.get_data_type()))) { recreate_index(); } diff --git a/internal/core/src/segcore/IndexConfigGenerator.cpp b/internal/core/src/segcore/IndexConfigGenerator.cpp index c9c390667237c..0c0d041359a89 100644 --- a/internal/core/src/segcore/IndexConfigGenerator.cpp +++ b/internal/core/src/segcore/IndexConfigGenerator.cpp @@ -16,8 +16,11 @@ namespace milvus::segcore { VecIndexConfig::VecIndexConfig(const int64_t max_index_row_cout, const FieldIndexMeta& index_meta_, const SegcoreConfig& config, - const SegmentType& segment_type) - : max_index_row_count_(max_index_row_cout), config_(config) { + const SegmentType& segment_type, + const bool is_sparse) + : max_index_row_count_(max_index_row_cout), + config_(config), + is_sparse_(is_sparse) { origin_index_type_ = index_meta_.GetIndexType(); metric_type_ = index_meta_.GeMetricType(); // Currently for dense vector index, if the segment is growing, we use IVFCC @@ -29,11 +32,15 @@ VecIndexConfig::VecIndexConfig(const int64_t max_index_row_cout, // But for sparse vector index(INDEX_SPARSE_INVERTED_INDEX and // INDEX_SPARSE_WAND), those index themselves can be used as the temp index // type, so we can avoid the extra step of "releast temp and load". + // When using HNSW(cardinal) for sparse, we use INDEX_SPARSE_INVERTED_INDEX + // as the growing index. if (origin_index_type_ == knowhere::IndexEnum::INDEX_SPARSE_INVERTED_INDEX || origin_index_type_ == knowhere::IndexEnum::INDEX_SPARSE_WAND) { index_type_ = origin_index_type_; + } else if (is_sparse_) { + index_type_ = knowhere::IndexEnum::INDEX_SPARSE_INVERTED_INDEX; } else { index_type_ = support_index_types.at(segment_type); } @@ -58,9 +65,7 @@ VecIndexConfig::GetBuildThreshold() const noexcept { // For sparse, do not impose a threshold and start using index with any // number of rows. Unlike dense vector index, growing sparse vector index // does not require a minimum number of rows to train. - if (origin_index_type_ == - knowhere::IndexEnum::INDEX_SPARSE_INVERTED_INDEX || - origin_index_type_ == knowhere::IndexEnum::INDEX_SPARSE_WAND) { + if (is_sparse_) { return 0; } assert(VecIndexConfig::index_build_ratio.count(index_type_)); diff --git a/internal/core/src/segcore/IndexConfigGenerator.h b/internal/core/src/segcore/IndexConfigGenerator.h index 102e4f74f048c..bf0d0eced2871 100644 --- a/internal/core/src/segcore/IndexConfigGenerator.h +++ b/internal/core/src/segcore/IndexConfigGenerator.h @@ -44,7 +44,8 @@ class VecIndexConfig { VecIndexConfig(const int64_t max_index_row_count, const FieldIndexMeta& index_meta_, const SegcoreConfig& config, - const SegmentType& segment_type); + const SegmentType& segment_type, + const bool is_sparse); int64_t GetBuildThreshold() const noexcept; @@ -72,6 +73,8 @@ class VecIndexConfig { knowhere::MetricType metric_type_; + bool is_sparse_; + knowhere::Json build_params_; knowhere::Json search_params_; diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index 04b7c5f78ed65..19962018a62e9 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -1593,7 +1593,8 @@ SegmentSealedImpl::generate_interim_index(const FieldId field_id) { new VecIndexConfig(row_count, field_index_meta, segcore_config_, - SegmentType::Sealed)); + SegmentType::Sealed, + is_sparse)); if (row_count < field_binlog_config->GetBuildThreshold()) { return false; } diff --git a/internal/core/src/storage/DiskFileManagerImpl.cpp b/internal/core/src/storage/DiskFileManagerImpl.cpp index a97f1503f90a9..32db184654985 100644 --- a/internal/core/src/storage/DiskFileManagerImpl.cpp +++ b/internal/core/src/storage/DiskFileManagerImpl.cpp @@ -452,10 +452,18 @@ DiskFileManagerImpl::CacheRawDataToDisk(std::vector remote_files) { auto local_chunk_manager = LocalChunkManagerSingleton::GetInstance().GetChunkManager(); - auto local_data_path = storage::GenFieldRawDataPathPrefix( - local_chunk_manager, segment_id, field_id) + - "raw_data"; - local_chunk_manager->CreateFile(local_data_path); + std::string local_data_path; + bool file_created = false; + + auto init_file_info = [&](milvus::DataType dt) { + local_data_path = storage::GenFieldRawDataPathPrefix( + local_chunk_manager, segment_id, field_id) + + "raw_data"; + if (dt == milvus::DataType::VECTOR_SPARSE_FLOAT) { + local_data_path += ".sparse_u32_f32"; + } + local_chunk_manager->CreateFile(local_data_path); + }; // get batch raw data from s3 and write batch data to disk file // TODO: load and write of different batches at the same time @@ -473,17 +481,50 @@ DiskFileManagerImpl::CacheRawDataToDisk(std::vector remote_files) { for (int i = 0; i < batch_size; ++i) { auto field_data = field_datas[i].get()->GetFieldData(); num_rows += uint32_t(field_data->get_num_rows()); - AssertInfo(dim == 0 || dim == field_data->get_dim(), - "inconsistent dim value in multi binlogs!"); - dim = field_data->get_dim(); - - auto data_size = - field_data->get_num_rows() * dim * sizeof(DataType); - local_chunk_manager->Write(local_data_path, - write_offset, - const_cast(field_data->Data()), - data_size); - write_offset += data_size; + auto data_type = field_data->get_data_type(); + if (!file_created) { + init_file_info(data_type); + file_created = true; + } + if (data_type == milvus::DataType::VECTOR_SPARSE_FLOAT) { + dim = std::max( + dim, + (uint32_t)( + std::dynamic_pointer_cast>( + field_data) + ->Dim())); + auto sparse_rows = + static_cast*>( + field_data->Data()); + for (size_t i = 0; i < field_data->Length(); ++i) { + auto row = sparse_rows[i]; + auto row_byte_size = row.data_byte_size(); + uint32_t nnz = row.size(); + local_chunk_manager->Write(local_data_path, + write_offset, + const_cast(&nnz), + sizeof(nnz)); + write_offset += sizeof(nnz); + local_chunk_manager->Write(local_data_path, + write_offset, + row.data(), + row_byte_size); + write_offset += row_byte_size; + } + } else { + AssertInfo(dim == 0 || dim == field_data->get_dim(), + "inconsistent dim value in multi binlogs!"); + dim = field_data->get_dim(); + + auto data_size = + field_data->get_num_rows() * dim * sizeof(DataType); + local_chunk_manager->Write( + local_data_path, + write_offset, + const_cast(field_data->Data()), + data_size); + write_offset += data_size; + } } };