diff --git a/internal/core/src/common/FieldData.cpp b/internal/core/src/common/FieldData.cpp index a9bd2f817f281..5f6fb00d720b9 100644 --- a/internal/core/src/common/FieldData.cpp +++ b/internal/core/src/common/FieldData.cpp @@ -26,10 +26,10 @@ namespace milvus { -template +template void -FieldDataImpl::FillFieldData(const void* source, - ssize_t element_count) { +FieldDataImpl::FillFieldData(const void* source, + ssize_t element_count) { if (element_count == 0) { return; } @@ -57,9 +57,9 @@ GetDataInfoFromArray(const std::shared_ptr array) { return std::make_pair(typed_array->raw_values(), element_count); } -template +template void -FieldDataImpl::FillFieldData( +FieldDataImpl::FillFieldData( const std::shared_ptr array) { AssertInfo(array != nullptr, "null arrow array"); auto element_count = array->length(); @@ -159,6 +159,18 @@ FieldDataImpl::FillFieldData( array); return FillFieldData(array_info.first, array_info.second); } + case DataType::VECTOR_SPARSE_FLOAT: { + AssertInfo(array->type()->id() == arrow::Type::type::BINARY, + "inconsistent data type"); + auto arr = std::dynamic_pointer_cast(array); + std::vector> values; + for (size_t index = 0; index < element_count; ++index) { + auto view = arr->GetString(index); + values.push_back( + CopyAndWrapSparseRow(view.data(), view.size())); + } + return FillFieldData(values.data(), element_count); + } default: { throw SegcoreError(DataTypeInvalid, GetName() + "::FillFieldData" + @@ -186,6 +198,7 @@ template class FieldDataImpl; template class FieldDataImpl; template class FieldDataImpl; template class FieldDataImpl; +template class FieldDataImpl, true>; FieldDataPtr InitScalarFieldData(const DataType& type, int64_t cap_rows) { diff --git a/internal/core/src/common/FieldData.h b/internal/core/src/common/FieldData.h index b767231da3e33..60e0c74b3ad56 100644 --- a/internal/core/src/common/FieldData.h +++ b/internal/core/src/common/FieldData.h @@ -121,6 +121,14 @@ class FieldData : public FieldDataImpl { } }; +template <> +class FieldData : public FieldDataSparseVectorImpl { + public: + explicit FieldData(DataType data_type, int64_t buffered_num_rows = 0) + : FieldDataSparseVectorImpl(data_type, buffered_num_rows) { + } +}; + using FieldDataPtr = std::shared_ptr; using FieldDataChannel = Channel; using FieldDataChannelPtr = std::shared_ptr; diff --git a/internal/core/src/common/FieldDataInterface.h b/internal/core/src/common/FieldDataInterface.h index f23d8f57f78ba..f5ce6a4299e4c 100644 --- a/internal/core/src/common/FieldDataInterface.h +++ b/internal/core/src/common/FieldDataInterface.h @@ -32,6 +32,7 @@ #include "common/VectorTrait.h" #include "common/EasyAssert.h" #include "common/Array.h" +#include "knowhere/dataset.h" namespace milvus { @@ -43,24 +44,33 @@ class FieldDataBase { } virtual ~FieldDataBase() = default; + // For all FieldDataImpl subclasses, source is a pointer to element_count of + // Type virtual void FillFieldData(const void* source, ssize_t element_count) = 0; virtual void FillFieldData(const std::shared_ptr array) = 0; + // For all FieldDataImpl subclasses, this method returns Type* that points + // at all rows in this field data. virtual void* Data() = 0; + // For all FieldDataImpl subclasses, this method returns a Type* that points + // at the offset-th row of this field data. virtual const void* RawValue(ssize_t offset) const = 0; + // Returns the serialized bytes size of all rows. virtual int64_t Size() const = 0; + // Returns the serialized bytes size of the index-th row. virtual int64_t Size(ssize_t index) const = 0; + // Number of filled rows virtual size_t Length() const = 0; @@ -71,9 +81,11 @@ class FieldDataBase { Reserve(size_t cap) = 0; public: + // row capacity virtual int64_t get_num_rows() const = 0; + // each row is represented as how many Type elements virtual int64_t get_dim() const = 0; @@ -86,11 +98,9 @@ class FieldDataBase { const DataType data_type_; }; -template +template class FieldDataImpl : public FieldDataBase { public: - // constants - using Chunk = FixedVector; FieldDataImpl(FieldDataImpl&&) = delete; FieldDataImpl(const FieldDataImpl&) = delete; @@ -105,13 +115,16 @@ class FieldDataImpl : public FieldDataBase { int64_t buffered_num_rows = 0) : FieldDataBase(data_type), num_rows_(buffered_num_rows), - dim_(is_scalar ? 1 : dim) { + dim_(is_type_entire_row ? 1 : dim) { field_data_.resize(num_rows_ * dim_); } - explicit FieldDataImpl(size_t dim, DataType type, Chunk&& field_data) - : FieldDataBase(type), dim_(is_scalar ? 1 : dim) { + explicit FieldDataImpl(size_t dim, + DataType type, + FixedVector&& field_data) + : FieldDataBase(type), dim_(is_type_entire_row ? 1 : dim) { field_data_ = std::move(field_data); + Assert(field_data.size() % dim == 0); num_rows_ = field_data.size() / dim; } @@ -122,10 +135,18 @@ class FieldDataImpl : public FieldDataBase { FillFieldData(const std::shared_ptr array) override; virtual void - FillFieldData(const std::shared_ptr& array){}; + FillFieldData(const std::shared_ptr& array) { + PanicInfo(NotImplemented, + "FillFieldData(const std::shared_ptr& " + "array) not implemented by default"); + } virtual void - FillFieldData(const std::shared_ptr& array){}; + FillFieldData(const std::shared_ptr& array) { + PanicInfo(NotImplemented, + "FillFieldData(const std::shared_ptr& " + "array) not implemented by default"); + } std::string GetName() const { @@ -209,9 +230,11 @@ class FieldDataImpl : public FieldDataBase { } protected: - Chunk field_data_; + FixedVector field_data_; + // number of elements field_data_ can hold int64_t num_rows_; mutable std::shared_mutex num_rows_mutex_; + // number of actual elements in field_data_ size_t length_{}; mutable std::shared_mutex tell_mutex_; @@ -322,6 +345,89 @@ class FieldDataJsonImpl : public FieldDataImpl { } }; +class FieldDataSparseVectorImpl + : public FieldDataImpl, true> { + public: + explicit FieldDataSparseVectorImpl(DataType data_type, + int64_t total_num_rows = 0) + : FieldDataImpl, true>( + /*dim=*/1, data_type, total_num_rows), + vec_dim_(0) { + AssertInfo(data_type == DataType::VECTOR_SPARSE_FLOAT, + "invalid data type for sparse vector"); + } + + int64_t + Size() const override { + int64_t data_size = 0; + for (size_t i = 0; i < length(); ++i) { + data_size += field_data_[i].data_byte_size(); + } + return data_size; + } + + int64_t + Size(ssize_t offset) const override { + AssertInfo(offset < get_num_rows(), + "field data subscript out of range"); + AssertInfo(offset < length(), + "subscript position don't has valid value"); + return field_data_[offset].data_byte_size(); + } + + // source is a pointer to element_count of + // knowhere::sparse::SparseRow + void + FillFieldData(const void* source, ssize_t element_count) override { + if (element_count == 0) { + return; + } + + std::lock_guard lck(tell_mutex_); + if (length_ + element_count > get_num_rows()) { + resize_field_data(length_ + element_count); + } + auto ptr = + static_cast*>(source); + for (int64_t i = 0; i < element_count; ++i) { + auto& row = ptr[i]; + vec_dim_ = std::max(vec_dim_, row.dim()); + } + std::copy_n(ptr, element_count, field_data_.data() + length_); + length_ += element_count; + } + + // each binary in array is a knowhere::sparse::SparseRow + void + FillFieldData(const std::shared_ptr& array) override { + auto n = array->length(); + if (n == 0) { + return; + } + + std::lock_guard lck(tell_mutex_); + if (length_ + n > get_num_rows()) { + resize_field_data(length_ + n); + } + + for (int64_t i = 0; i < array->length(); ++i) { + auto view = array->GetView(i); + auto& row = field_data_[length_ + i]; + row = CopyAndWrapSparseRow(view.data(), view.size()); + vec_dim_ = std::max(vec_dim_, row.dim()); + } + length_ += n; + } + + int64_t + Dim() const { + return vec_dim_; + } + + private: + int64_t vec_dim_; +}; + class FieldDataArrayImpl : public FieldDataImpl { public: explicit FieldDataArrayImpl(DataType data_type, int64_t total_num_rows = 0) diff --git a/internal/core/src/common/FieldMeta.h b/internal/core/src/common/FieldMeta.h index e1e50ae1610ff..bb8d590fa0753 100644 --- a/internal/core/src/common/FieldMeta.h +++ b/internal/core/src/common/FieldMeta.h @@ -54,6 +54,10 @@ datatype_sizeof(DataType data_type, int dim = 1) { case DataType::VECTOR_BFLOAT16: { return sizeof(bfloat16) * dim; } + // Not supporting VECTOR_SPARSE_FLOAT here intentionally. We can't + // easily estimately the size of a sparse float vector. Caller of this + // method must handle this case themselves and must not pass + // VECTOR_SPARSE_FLOAT data_type. default: { throw SegcoreError(DataTypeInvalid, fmt::format("invalid type is {}", data_type)); @@ -100,6 +104,9 @@ datatype_name(DataType data_type) { case DataType::VECTOR_BFLOAT16: { return "vector_bfloat16"; } + case DataType::VECTOR_SPARSE_FLOAT: { + return "vector_sparse_float"; + } default: { PanicInfo(DataTypeInvalid, "Unsupported DataType({})", data_type); } @@ -111,7 +118,13 @@ datatype_is_vector(DataType datatype) { return datatype == DataType::VECTOR_BINARY || datatype == DataType::VECTOR_FLOAT || datatype == DataType::VECTOR_FLOAT16 || - datatype == DataType::VECTOR_BFLOAT16; + datatype == DataType::VECTOR_BFLOAT16 || + datatype == DataType::VECTOR_SPARSE_FLOAT; +} + +inline bool +datatype_is_sparse_vector(DataType datatype) { + return datatype == DataType::VECTOR_SPARSE_FLOAT; } inline bool @@ -153,6 +166,7 @@ datatype_is_variable(DataType datatype) { case DataType::STRING: case DataType::ARRAY: case DataType::JSON: + case DataType::VECTOR_SPARSE_FLOAT: return true; default: return false; @@ -217,6 +231,8 @@ class FieldMeta { Assert(datatype_is_array(type_)); } + // pass in any value for dim for sparse vector is ok as it'll never be used: + // get_dim() not allowed to be invoked on a sparse vector field. FieldMeta(const FieldName& name, FieldId id, DataType type, @@ -232,6 +248,8 @@ class FieldMeta { int64_t get_dim() const { Assert(datatype_is_vector(type_)); + // should not attempt to get dim() of a sparse vector from schema. + Assert(!datatype_is_sparse_vector(type_)); Assert(vector_info_.has_value()); return vector_info_->dim_; } @@ -282,6 +300,9 @@ class FieldMeta { size_t get_sizeof() const { + AssertInfo(!datatype_is_sparse_vector(type_), + "should not attempt to get_sizeof() of a sparse vector from " + "schema"); static const size_t ARRAY_SIZE = 128; static const size_t JSON_SIZE = 512; if (is_vector()) { diff --git a/internal/core/src/common/Schema.cpp b/internal/core/src/common/Schema.cpp index fae2cb6ed41cf..831e279b49a59 100644 --- a/internal/core/src/common/Schema.cpp +++ b/internal/core/src/common/Schema.cpp @@ -54,8 +54,11 @@ Schema::ParseFrom(const milvus::proto::schema::CollectionSchema& schema_proto) { auto type_map = RepeatedKeyValToMap(child.type_params()); auto index_map = RepeatedKeyValToMap(child.index_params()); - AssertInfo(type_map.count("dim"), "dim not found"); - auto dim = boost::lexical_cast(type_map.at("dim")); + int64_t dim = 0; + if (!datatype_is_sparse_vector(data_type)) { + AssertInfo(type_map.count("dim"), "dim not found"); + dim = boost::lexical_cast(type_map.at("dim")); + } if (!index_map.count("metric_type")) { schema->AddField(name, field_id, data_type, dim, std::nullopt); } else { diff --git a/internal/core/src/common/Schema.h b/internal/core/src/common/Schema.h index 71187f1004564..b1068dd650392 100644 --- a/internal/core/src/common/Schema.h +++ b/internal/core/src/common/Schema.h @@ -132,11 +132,6 @@ class Schema { return fields_.at(field_id); } - auto - get_total_sizeof() const { - return total_sizeof_; - } - FieldId get_field_id(const FieldName& field_name) const { AssertInfo(name_ids_.count(field_name), "Cannot find field_name"); @@ -181,9 +176,6 @@ class Schema { fields_.emplace(field_id, field_meta); field_ids_.emplace_back(field_id); - - auto field_sizeof = field_meta.get_sizeof(); - total_sizeof_ += field_sizeof; } private: @@ -197,7 +189,6 @@ class Schema { std::unordered_map name_ids_; // field_name -> field_id std::unordered_map id_names_; // field_id -> field_name - int64_t total_sizeof_ = 0; std::optional primary_field_id_opt_; }; diff --git a/internal/core/src/common/Span.h b/internal/core/src/common/Span.h index 4ab50fb99caf6..cc6cbf2b727ad 100644 --- a/internal/core/src/common/Span.h +++ b/internal/core/src/common/Span.h @@ -60,9 +60,9 @@ class Span; // TODO: refine Span to support T=FloatVector template -class Span< - T, - typename std::enable_if_t || std::is_same_v>> { +class Span || IsScalar || + std::is_same_v>> { public: using embedded_type = T; explicit Span(const T* data, int64_t row_count) diff --git a/internal/core/src/common/Types.h b/internal/core/src/common/Types.h index 2c822590dd63e..c0e742a031c46 100644 --- a/internal/core/src/common/Types.h +++ b/internal/core/src/common/Types.h @@ -80,6 +80,7 @@ enum class DataType { VECTOR_FLOAT = 101, VECTOR_FLOAT16 = 102, VECTOR_BFLOAT16 = 103, + VECTOR_SPARSE_FLOAT = 104, }; using Timestamp = uint64_t; // TODO: use TiKV-like timestamp @@ -92,7 +93,7 @@ using ScalarArray = proto::schema::ScalarField; using DataArray = proto::schema::FieldData; using VectorArray = proto::schema::VectorField; using IdArray = proto::schema::IDs; -using InsertData = proto::segcore::InsertRecord; +using InsertRecordProto = proto::segcore::InsertRecord; using PkType = std::variant; inline size_t @@ -379,6 +380,9 @@ struct fmt::formatter : formatter { case milvus::DataType::VECTOR_BFLOAT16: name = "VECTOR_BFLOAT16"; break; + case milvus::DataType::VECTOR_SPARSE_FLOAT: + name = "VECTOR_SPARSE_FLOAT"; + break; } return formatter::format(name, ctx); } diff --git a/internal/core/src/common/Utils.h b/internal/core/src/common/Utils.h index 9372e4c0315d2..cd3948348ee44 100644 --- a/internal/core/src/common/Utils.h +++ b/internal/core/src/common/Utils.h @@ -31,6 +31,7 @@ #include "common/EasyAssert.h" #include "knowhere/dataset.h" #include "knowhere/expected.h" +#include "knowhere/sparse_utils.h" #include "simdjson.h" namespace milvus { @@ -213,4 +214,51 @@ GetCommonPrefix(const std::string& str1, const std::string& str2) { return str1.substr(0, i); } +inline knowhere::sparse::SparseRow +CopyAndWrapSparseRow(const void* data, size_t size) { + size_t num_elements = + size / knowhere::sparse::SparseRow::element_size(); + knowhere::sparse::SparseRow row(num_elements); + std::memcpy(row.data(), data, size); + // TODO(SPARSE): validate + return row; +} + +// Iterable is a list of bytes, each is a byte array representation of a single +// sparse float row. This helper function converts such byte arrays into a list +// of knowhere::sparse::SparseRow. The resulting list is a deep copy of +// the source data. +template +std::unique_ptr[]> +SparseBytesToRows(const Iterable& rows) { + AssertInfo(rows.size() > 0, "at least 1 sparse row should be provided"); + auto res = + std::make_unique[]>(rows.size()); + for (size_t i = 0; i < rows.size(); ++i) { + res[i] = + std::move(CopyAndWrapSparseRow(rows[i].data(), rows[i].size())); + } + return res; +} + +// SparseRowsToProto converts a vector of knowhere::sparse::SparseRow to +// a milvus::proto::schema::SparseFloatArray. The resulting proto is a deep copy +// of the source data. +inline void SparseRowsToProto(const knowhere::sparse::SparseRow* source, + int64_t rows, + milvus::proto::schema::SparseFloatArray* proto) { + int64_t max_dim = 0; + for (size_t i = 0; i < rows; ++i) { + if (source + i == nullptr) { + // empty row + proto->add_contents(); + continue; + } + auto& row = source[i]; + max_dim = std::max(max_dim, row.dim()); + proto->add_contents(row.data(), row.data_byte_size()); + } + proto->set_dim(max_dim); +} + } // namespace milvus diff --git a/internal/core/src/common/VectorTrait.h b/internal/core/src/common/VectorTrait.h index 28481e09881ee..8062910ce2f43 100644 --- a/internal/core/src/common/VectorTrait.h +++ b/internal/core/src/common/VectorTrait.h @@ -48,20 +48,11 @@ class BFloat16Vector : public VectorTrait { static constexpr auto metric_type = DataType::VECTOR_BFLOAT16; }; -template -inline constexpr int64_t -element_sizeof(int64_t dim) { - static_assert(std::is_base_of_v); - if constexpr (std::is_same_v) { - return dim * sizeof(float); - } else if constexpr (std::is_same_v) { - return dim * sizeof(float16); - } else if constexpr (std::is_same_v) { - return dim * sizeof(bfloat16); - } else { - return dim / 8; - } -} +class SparseFloatVector : public VectorTrait { + public: + using embedded_type = float; + static constexpr auto metric_type = DataType::VECTOR_SPARSE_FLOAT; +}; template constexpr bool IsVector = std::is_base_of_v; @@ -73,6 +64,10 @@ constexpr bool IsScalar = std::is_same_v || std::is_same_v || std::is_same_v; +template +constexpr bool IsSparse = std::is_same_v || + std::is_same_v>; + template struct EmbeddedTypeImpl; @@ -86,11 +81,15 @@ struct EmbeddedTypeImpl>> { using type = std::conditional_t< std::is_same_v, float, - std::conditional_t, - float16, - std::conditional_t, - bfloat16, - uint8_t>>>; + std::conditional_t< + std::is_same_v, + float16, + std::conditional_t< + std::is_same_v, + bfloat16, + std::conditional_t, + void, + uint8_t>>>>; }; template diff --git a/internal/core/src/common/type_c.h b/internal/core/src/common/type_c.h index 14aa83017e760..acdf68a7bd3a2 100644 --- a/internal/core/src/common/type_c.h +++ b/internal/core/src/common/type_c.h @@ -52,6 +52,7 @@ enum CDataType { FloatVector = 101, Float16Vector = 102, BFloat16Vector = 103, + SparseFloatVector = 104, }; typedef enum CDataType CDataType; diff --git a/internal/core/src/index/IndexFactory.cpp b/internal/core/src/index/IndexFactory.cpp index c0df7bd3bb625..ac8cb432e5145 100644 --- a/internal/core/src/index/IndexFactory.cpp +++ b/internal/core/src/index/IndexFactory.cpp @@ -205,7 +205,8 @@ IndexFactory::CreateVectorIndex( } } else { // create mem index switch (data_type) { - case DataType::VECTOR_FLOAT: { + case DataType::VECTOR_FLOAT: + case DataType::VECTOR_SPARSE_FLOAT: { return std::make_unique>( index_type, metric_type, version, file_manager_context); } @@ -311,7 +312,8 @@ IndexFactory::CreateVectorIndex( } } else { // create mem index switch (data_type) { - case DataType::VECTOR_FLOAT: { + case DataType::VECTOR_FLOAT: + case DataType::VECTOR_SPARSE_FLOAT: { return std::make_unique>( create_index_info, file_manager_context, space); } diff --git a/internal/core/src/index/Utils.cpp b/internal/core/src/index/Utils.cpp index b1d889e171bf7..a9ad1cf1a0d91 100644 --- a/internal/core/src/index/Utils.cpp +++ b/internal/core/src/index/Utils.cpp @@ -68,6 +68,30 @@ unsupported_index_combinations() { static std::vector> ret{ std::make_tuple(knowhere::IndexEnum::INDEX_FAISS_BIN_IVFFLAT, knowhere::metric::L2), + std::make_tuple(knowhere::IndexEnum::INDEX_SPARSE_INVERTED_INDEX, + knowhere::metric::L2), + std::make_tuple(knowhere::IndexEnum::INDEX_SPARSE_INVERTED_INDEX, + knowhere::metric::COSINE), + std::make_tuple(knowhere::IndexEnum::INDEX_SPARSE_INVERTED_INDEX, + knowhere::metric::HAMMING), + std::make_tuple(knowhere::IndexEnum::INDEX_SPARSE_INVERTED_INDEX, + knowhere::metric::JACCARD), + std::make_tuple(knowhere::IndexEnum::INDEX_SPARSE_INVERTED_INDEX, + knowhere::metric::SUBSTRUCTURE), + std::make_tuple(knowhere::IndexEnum::INDEX_SPARSE_INVERTED_INDEX, + knowhere::metric::SUPERSTRUCTURE), + std::make_tuple(knowhere::IndexEnum::INDEX_SPARSE_WAND, + knowhere::metric::L2), + std::make_tuple(knowhere::IndexEnum::INDEX_SPARSE_WAND, + knowhere::metric::COSINE), + std::make_tuple(knowhere::IndexEnum::INDEX_SPARSE_WAND, + knowhere::metric::HAMMING), + std::make_tuple(knowhere::IndexEnum::INDEX_SPARSE_WAND, + knowhere::metric::JACCARD), + std::make_tuple(knowhere::IndexEnum::INDEX_SPARSE_WAND, + knowhere::metric::SUBSTRUCTURE), + std::make_tuple(knowhere::IndexEnum::INDEX_SPARSE_WAND, + knowhere::metric::SUPERSTRUCTURE), }; return ret; } diff --git a/internal/core/src/index/VectorMemIndex.cpp b/internal/core/src/index/VectorMemIndex.cpp index 28ad4eb93f4da..f37ae18acc796 100644 --- a/internal/core/src/index/VectorMemIndex.cpp +++ b/internal/core/src/index/VectorMemIndex.cpp @@ -483,37 +483,68 @@ VectorMemIndex::Build(const Config& config) { auto insert_files = GetValueFromConfig>(config, "insert_files"); AssertInfo(insert_files.has_value(), - "insert file paths is empty when build disk ann index"); + "insert file paths is empty when building in memory index"); auto field_datas = file_manager_->CacheRawDataToMemory(insert_files.value()); - int64_t total_size = 0; - int64_t total_num_rows = 0; - int64_t dim = 0; - for (auto data : field_datas) { - total_size += data->Size(); - total_num_rows += data->get_num_rows(); - AssertInfo(dim == 0 || dim == data->get_dim(), - "inconsistent dim value between field datas!"); - dim = data->get_dim(); - } - - auto buf = std::shared_ptr(new uint8_t[total_size]); - int64_t offset = 0; - for (auto data : field_datas) { - std::memcpy(buf.get() + offset, data->Data(), data->Size()); - offset += data->Size(); - data.reset(); - } - field_datas.clear(); - Config build_config; build_config.update(config); build_config.erase("insert_files"); build_config.erase(VEC_OPT_FIELDS); + if (GetIndexType().find("SPARSE") == std::string::npos) { + int64_t total_size = 0; + int64_t total_num_rows = 0; + int64_t dim = 0; + for (auto data : field_datas) { + total_size += data->Size(); + total_num_rows += data->get_num_rows(); + AssertInfo(dim == 0 || dim == data->get_dim(), + "inconsistent dim value between field datas!"); + dim = data->get_dim(); + } - auto dataset = GenDataset(total_num_rows, dim, buf.get()); - BuildWithDataset(dataset, build_config); + auto buf = std::shared_ptr(new uint8_t[total_size]); + int64_t offset = 0; + // TODO: avoid copying + for (auto data : field_datas) { + std::memcpy(buf.get() + offset, data->Data(), data->Size()); + offset += data->Size(); + data.reset(); + } + field_datas.clear(); + + auto dataset = GenDataset(total_num_rows, dim, buf.get()); + BuildWithDataset(dataset, build_config); + } else { + // sparse + int64_t total_rows = 0; + int64_t dim = 0; + for (auto field_data : field_datas) { + total_rows += field_data->Length(); + dim = std::max( + dim, + std::dynamic_pointer_cast>( + field_data) + ->Dim()); + } + std::vector> vec(total_rows); + int64_t offset = 0; + for (auto field_data : field_datas) { + auto ptr = static_cast*>( + field_data->Data()); + AssertInfo(ptr, "failed to cast field data to sparse rows"); + for (size_t i = 0; i < field_data->Length(); ++i) { + // this does a deep copy of field_data's data. + // TODO: avoid copying by enforcing field data to give up + // ownership. + vec[offset + i] = ptr[i]; + } + offset += field_data->Length(); + } + auto dataset = GenDataset(total_rows, dim, vec.data()); + dataset->SetIsSparse(true); + BuildWithDataset(dataset, build_config); + } } template diff --git a/internal/core/src/indexbuilder/IndexFactory.h b/internal/core/src/indexbuilder/IndexFactory.h index 3b6e6874de9cd..cd361499b4065 100644 --- a/internal/core/src/indexbuilder/IndexFactory.h +++ b/internal/core/src/indexbuilder/IndexFactory.h @@ -66,6 +66,7 @@ class IndexFactory { case DataType::VECTOR_FLOAT16: case DataType::VECTOR_BFLOAT16: case DataType::VECTOR_BINARY: + case DataType::VECTOR_SPARSE_FLOAT: return std::make_unique(type, config, context); default: throw SegcoreError( @@ -101,6 +102,7 @@ class IndexFactory { case DataType::VECTOR_BINARY: case DataType::VECTOR_FLOAT16: case DataType::VECTOR_BFLOAT16: + case DataType::VECTOR_SPARSE_FLOAT: return std::make_unique( type, field_name, dim, config, file_manager_context, space); default: diff --git a/internal/core/src/indexbuilder/index_c.cpp b/internal/core/src/indexbuilder/index_c.cpp index b921118586710..1beba763b7bdf 100644 --- a/internal/core/src/indexbuilder/index_c.cpp +++ b/internal/core/src/indexbuilder/index_c.cpp @@ -351,6 +351,32 @@ BuildBinaryVecIndex(CIndex index, int64_t data_size, const uint8_t* vectors) { return status; } +CStatus +BuildSparseFloatVecIndex(CIndex index, + int64_t row_num, + int64_t dim, + const uint8_t* vectors) { + auto status = CStatus(); + try { + AssertInfo( + index, + "failed to build sparse float vector index, passed index was null"); + auto real_index = + reinterpret_cast(index); + auto cIndex = + dynamic_cast(real_index); + auto ds = knowhere::GenDataSet(row_num, dim, vectors); + ds->SetIsSparse(true); + cIndex->Build(ds); + status.error_code = Success; + status.error_msg = ""; + } catch (std::exception& e) { + status.error_code = UnexpectedError; + status.error_msg = strdup(e.what()); + } + return status; +} + // field_data: // 1, serialized proto::schema::BoolArray, if type is bool; // 2, serialized proto::schema::StringArray, if type is string; diff --git a/internal/core/src/indexbuilder/index_c.h b/internal/core/src/indexbuilder/index_c.h index d13e121737efc..16cd76e4531ce 100644 --- a/internal/core/src/indexbuilder/index_c.h +++ b/internal/core/src/indexbuilder/index_c.h @@ -20,6 +20,7 @@ extern "C" { #include "common/binary_set_c.h" #include "indexbuilder/type_c.h" +// used only in test CStatus CreateIndexV0(enum CDataType dtype, const char* serialized_type_params, @@ -43,6 +44,13 @@ BuildFloat16VecIndex(CIndex index, int64_t data_size, const uint8_t* vectors); CStatus BuildBFloat16VecIndex(CIndex index, int64_t data_size, const uint8_t* vectors); + +CStatus +BuildSparseFloatVecIndex(CIndex index, + int64_t row_num, + int64_t dim, + const uint8_t* vectors); + // field_data: // 1, serialized proto::schema::BoolArray, if type is bool; // 2, serialized proto::schema::StringArray, if type is string; diff --git a/internal/core/src/mmap/Column.h b/internal/core/src/mmap/Column.h index ce7c085b63c3b..0bed86c02da8e 100644 --- a/internal/core/src/mmap/Column.h +++ b/internal/core/src/mmap/Column.h @@ -34,6 +34,10 @@ #include "fmt/format.h" #include "log/Log.h" #include "mmap/Utils.h" +#include "common/FieldData.h" +#include "common/FieldDataInterface.h" +#include "common/Array.h" +#include "knowhere/dataset.h" namespace milvus { @@ -134,7 +138,7 @@ class ColumnBase { column.size_ = 0; } - const char* + virtual const char* Data() const { return data_; } @@ -144,14 +148,14 @@ class ColumnBase { return num_rows_; }; - const size_t + virtual size_t ByteSize() const { return cap_size_ + padding_; } // The capacity of the column, - // DO NOT call this for variable length column. - size_t + // DO NOT call this for variable length column(including SparseFloatColumn). + virtual size_t Capacity() const { return cap_size_ / type_size_; } @@ -159,8 +163,8 @@ class ColumnBase { virtual SpanBase Span() const = 0; - void - AppendBatch(const FieldDataPtr& data) { + virtual void + AppendBatch(const FieldDataPtr data) { size_t required_size = size_ + data->Size(); if (required_size > cap_size_) { Expand(required_size * 2 + padding_); @@ -174,7 +178,7 @@ class ColumnBase { } // Append one row - void + virtual void Append(const char* data, size_t size) { size_t required_size = size_ + size; if (required_size > cap_size_) { @@ -263,6 +267,80 @@ class Column : public ColumnBase { } }; +// mmap not yet supported, thus SparseFloatColumn is not using fields in super +// class such as ColumnBase::data. +class SparseFloatColumn : public ColumnBase { + public: + // memory mode ctor + SparseFloatColumn(const FieldMeta& field_meta) : ColumnBase(0, field_meta) { + } + // mmap mode ctor + SparseFloatColumn(const File& file, + size_t size, + const FieldMeta& field_meta) + : ColumnBase(file, size, field_meta) { + AssertInfo(false, "SparseFloatColumn mmap mode not supported"); + } + + SparseFloatColumn(SparseFloatColumn&& column) noexcept + : ColumnBase(std::move(column)), + dim_(column.dim_), + vec_(std::move(column.vec_)) { + } + + ~SparseFloatColumn() override = default; + + const char* + Data() const override { + return static_cast(static_cast(vec_.data())); + } + + // This is used to advice mmap prefetch, we don't currently support mmap for + // sparse float vector thus not implemented for now. + size_t + ByteSize() const override { + throw std::runtime_error( + "ByteSize not supported for sparse float column"); + } + + size_t + Capacity() const override { + throw std::runtime_error( + "Capacity not supported for sparse float column"); + } + + SpanBase + Span() const override { + throw std::runtime_error("Span not supported for sparse float column"); + } + + void + AppendBatch(const FieldDataPtr data) override { + auto ptr = static_cast*>( + data->Data()); + vec_.insert(vec_.end(), ptr, ptr + data->Length()); + for (size_t i = 0; i < data->Length(); ++i) { + dim_ = std::max(dim_, ptr[i].dim()); + } + num_rows_ += data->Length(); + } + + void + Append(const char* data, size_t size) override { + throw std::runtime_error( + "Append not supported for sparse float column"); + } + + int64_t + Dim() const { + return dim_; + } + + private: + int64_t dim_ = 0; + std::vector> vec_; +}; + template class VariableColumn : public ColumnBase { public: diff --git a/internal/core/src/mmap/Utils.h b/internal/core/src/mmap/Utils.h index 7b1dc8308d725..94219e4fdd8a5 100644 --- a/internal/core/src/mmap/Utils.h +++ b/internal/core/src/mmap/Utils.h @@ -31,52 +31,6 @@ namespace milvus { -inline size_t -GetDataSize(const std::vector& datas) { - size_t total_size{0}; - for (const auto& data : datas) { - total_size += data->Size(); - } - - return total_size; -} - -inline void* -FillField(DataType data_type, const FieldDataPtr& data, void* dst) { - char* dest = reinterpret_cast(dst); - if (datatype_is_variable(data_type)) { - switch (data_type) { - case DataType::STRING: - case DataType::VARCHAR: { - for (ssize_t i = 0; i < data->get_num_rows(); ++i) { - auto str = - static_cast(data->RawValue(i)); - memcpy(dest, str->data(), str->size()); - dest += str->size(); - } - break; - } - case DataType::JSON: { - for (ssize_t i = 0; i < data->get_num_rows(); ++i) { - auto padded_string = - static_cast(data->RawValue(i))->data(); - memcpy(dest, padded_string.data(), padded_string.size()); - dest += padded_string.size(); - } - break; - } - default: - PanicInfo( - DataTypeInvalid, "not supported data type {}", data_type); - } - } else { - memcpy(dst, data->Data(), data->Size()); - dest += data->Size(); - } - - return dest; -} - inline size_t WriteFieldData(File& file, DataType data_type, @@ -124,6 +78,12 @@ WriteFieldData(File& file, } break; } + case DataType::VECTOR_SPARSE_FLOAT: { + // TODO(SPARSE): this is for mmap to write data to disk so that + // the file can be mmaped into memory. + throw std::runtime_error( + "WriteFieldData for VECTOR_SPARSE_FLOAT not implemented"); + } default: PanicInfo(DataTypeInvalid, "not supported data type {}", diff --git a/internal/core/src/segcore/CMakeLists.txt b/internal/core/src/segcore/CMakeLists.txt index 4719a860d17a3..9986882e7a0d3 100644 --- a/internal/core/src/segcore/CMakeLists.txt +++ b/internal/core/src/segcore/CMakeLists.txt @@ -24,7 +24,6 @@ set(SEGCORE_FILES SegmentGrowingImpl.cpp SegmentSealedImpl.cpp FieldIndexing.cpp - InsertRecord.cpp Reduce.cpp metrics_c.cpp plan_c.cpp @@ -35,7 +34,6 @@ set(SEGCORE_FILES SegcoreConfig.cpp IndexConfigGenerator.cpp segcore_init_c.cpp - ScalarIndex.cpp TimestampIndex.cpp Utils.cpp ConcurrentVector.cpp) diff --git a/internal/core/src/segcore/ConcurrentVector.cpp b/internal/core/src/segcore/ConcurrentVector.cpp index 3bf9f4ee20b75..0fc665d303ab6 100644 --- a/internal/core/src/segcore/ConcurrentVector.cpp +++ b/internal/core/src/segcore/ConcurrentVector.cpp @@ -36,8 +36,16 @@ VectorBase::set_data_raw(ssize_t element_offset, } else if (field_meta.get_data_type() == DataType::VECTOR_BFLOAT16) { return set_data_raw( element_offset, VEC_FIELD_DATA(data, bfloat16), element_count); + } else if (field_meta.get_data_type() == + DataType::VECTOR_SPARSE_FLOAT) { + return set_data_raw( + element_offset, + SparseBytesToRows( + data->vectors().sparse_float_vector().contents()) + .get(), + element_count); } else { - PanicInfo(DataTypeInvalid, "unsupported"); + PanicInfo(DataTypeInvalid, "unsupported vector type"); } } diff --git a/internal/core/src/segcore/ConcurrentVector.h b/internal/core/src/segcore/ConcurrentVector.h index cdbac4e1971db..05287460d9d81 100644 --- a/internal/core/src/segcore/ConcurrentVector.h +++ b/internal/core/src/segcore/ConcurrentVector.h @@ -93,9 +93,6 @@ class VectorBase { } virtual ~VectorBase() = default; - virtual void - grow_to_at_least(int64_t element_count) = 0; - virtual void set_data_raw(ssize_t element_offset, const void* source, @@ -105,12 +102,13 @@ class VectorBase { set_data_raw(ssize_t element_offset, const std::vector& data) = 0; - void + virtual void set_data_raw(ssize_t element_offset, ssize_t element_count, const DataArray* data, const FieldMeta& field_meta); + // used only by sealed segment to load system field virtual void fill_chunk_data(const std::vector& data) = 0; @@ -135,7 +133,7 @@ class VectorBase { const int64_t size_per_chunk_; }; -template +template class ConcurrentVectorImpl : public VectorBase { public: // constants @@ -149,7 +147,7 @@ class ConcurrentVectorImpl : public VectorBase { operator=(const ConcurrentVectorImpl&) = delete; using TraitType = std::conditional_t< - is_scalar, + is_type_entire_row, Type, std::conditional_t< std::is_same_v, @@ -162,27 +160,16 @@ class ConcurrentVectorImpl : public VectorBase { BinaryVector>>>>; public: - explicit ConcurrentVectorImpl(ssize_t dim, int64_t size_per_chunk) - : VectorBase(size_per_chunk), Dim(is_scalar ? 1 : dim) { - // Assert(is_scalar ? dim == 1 : dim != 1); - } - - void - grow_to_at_least(int64_t element_count) override { - auto chunk_count = upper_div(element_count, size_per_chunk_); - chunks_.emplace_to_at_least(chunk_count, Dim * size_per_chunk_); - } - - void - grow_on_demand(int64_t element_count) { - auto chunk_count = upper_div(element_count, size_per_chunk_); - chunks_.emplace_to_at_least(chunk_count, Dim * element_count); + explicit ConcurrentVectorImpl(ssize_t elements_per_row, + int64_t size_per_chunk) + : VectorBase(size_per_chunk), + elements_per_row_(is_type_entire_row ? 1 : elements_per_row) { } Span get_span(int64_t chunk_id) const { auto& chunk = get_chunk(chunk_id); - if constexpr (is_scalar) { + if constexpr (is_type_entire_row) { return Span(chunk.data(), chunk.size()); } else if constexpr (std::is_same_v || // NOLINT std::is_same_v) { @@ -191,7 +178,8 @@ class ConcurrentVectorImpl : public VectorBase { } else { static_assert( std::is_same_v); - return Span(chunk.data(), chunk.size(), Dim); + return Span( + chunk.data(), chunk.size(), elements_per_row_); } } @@ -201,15 +189,14 @@ class ConcurrentVectorImpl : public VectorBase { } void - fill_chunk_data(const std::vector& datas) - override { // used only for sealed segment - AssertInfo(chunks_.size() == 0, "no empty concurrent vector"); + fill_chunk_data(const std::vector& datas) override { + AssertInfo(chunks_.size() == 0, "non empty concurrent vector"); int64_t element_count = 0; for (auto& field_data : datas) { element_count += field_data->get_num_rows(); } - chunks_.emplace_to_at_least(1, Dim * element_count); + chunks_.emplace_to_at_least(1, elements_per_row_ * element_count); int64_t offset = 0; for (auto& field_data : datas) { auto num_rows = field_data->get_num_rows(); @@ -236,47 +223,13 @@ class ConcurrentVectorImpl : public VectorBase { if (element_count == 0) { return; } - this->grow_to_at_least(element_offset + element_count); + chunks_.emplace_to_at_least( + upper_div(element_offset + element_count, size_per_chunk_), + elements_per_row_ * size_per_chunk_); set_data( element_offset, static_cast(source), element_count); } - void - set_data(ssize_t element_offset, - const Type* source, - ssize_t element_count) { - auto chunk_id = element_offset / size_per_chunk_; - auto chunk_offset = element_offset % size_per_chunk_; - ssize_t source_offset = 0; - // first partition: - if (chunk_offset + element_count <= size_per_chunk_) { - // only first - fill_chunk( - chunk_id, chunk_offset, element_count, source, source_offset); - return; - } - - auto first_size = size_per_chunk_ - chunk_offset; - fill_chunk(chunk_id, chunk_offset, first_size, source, source_offset); - - source_offset += size_per_chunk_ - chunk_offset; - element_count -= first_size; - ++chunk_id; - - // the middle - while (element_count >= size_per_chunk_) { - fill_chunk(chunk_id, 0, size_per_chunk_, source, source_offset); - source_offset += size_per_chunk_; - element_count -= size_per_chunk_; - ++chunk_id; - } - - // the final - if (element_count > 0) { - fill_chunk(chunk_id, 0, element_count, source, source_offset); - } - } - const Chunk& get_chunk(ssize_t chunk_index) const { return chunks_[chunk_index]; @@ -297,13 +250,16 @@ class ConcurrentVectorImpl : public VectorBase { get_element(ssize_t element_index) const { auto chunk_id = element_index / size_per_chunk_; auto chunk_offset = element_index % size_per_chunk_; - return get_chunk(chunk_id).data() + chunk_offset * Dim; + return get_chunk(chunk_id).data() + chunk_offset * elements_per_row_; } const Type& operator[](ssize_t element_index) const { - AssertInfo(Dim == 1, - fmt::format("The value of Dim is not 1, Dim={}", Dim)); + AssertInfo( + elements_per_row_ == 1, + fmt::format( + "The value of elements_per_row_ is not 1, elements_per_row_={}", + elements_per_row_)); auto chunk_id = element_index / size_per_chunk_; auto chunk_offset = element_index % size_per_chunk_; return get_chunk(chunk_id)[chunk_offset]; @@ -331,6 +287,42 @@ class ConcurrentVectorImpl : public VectorBase { } private: + void + set_data(ssize_t element_offset, + const Type* source, + ssize_t element_count) { + auto chunk_id = element_offset / size_per_chunk_; + auto chunk_offset = element_offset % size_per_chunk_; + ssize_t source_offset = 0; + // first partition: + if (chunk_offset + element_count <= size_per_chunk_) { + // only first + fill_chunk( + chunk_id, chunk_offset, element_count, source, source_offset); + return; + } + + auto first_size = size_per_chunk_ - chunk_offset; + fill_chunk(chunk_id, chunk_offset, first_size, source, source_offset); + + source_offset += size_per_chunk_ - chunk_offset; + element_count -= first_size; + ++chunk_id; + + // the middle + while (element_count >= size_per_chunk_) { + fill_chunk(chunk_id, 0, size_per_chunk_, source, source_offset); + source_offset += size_per_chunk_; + element_count -= size_per_chunk_; + ++chunk_id; + } + + // the final + if (element_count > 0) { + fill_chunk(chunk_id, 0, element_count, source, source_offset); + } + } + void fill_chunk(ssize_t chunk_id, ssize_t chunk_offset, @@ -349,12 +341,12 @@ class ConcurrentVectorImpl : public VectorBase { Chunk& chunk = chunks_[chunk_id]; auto ptr = chunk.data(); - std::copy_n(source + source_offset * Dim, - element_count * Dim, - ptr + chunk_offset * Dim); + std::copy_n(source + source_offset * elements_per_row_, + element_count * elements_per_row_, + ptr + chunk_offset * elements_per_row_); } - const ssize_t Dim; + const ssize_t elements_per_row_; private: ThreadSafeVector chunks_; @@ -370,6 +362,40 @@ class ConcurrentVector : public ConcurrentVectorImpl { } }; +template <> +class ConcurrentVector + : public ConcurrentVectorImpl, true> { + public: + explicit ConcurrentVector(int64_t size_per_chunk) + : ConcurrentVectorImpl, + true>::ConcurrentVectorImpl(1, size_per_chunk), + dim_(0) { + } + + void + set_data_raw(ssize_t element_offset, + const void* source, + ssize_t element_count) override { + auto* src = + static_cast*>(source); + for (int i = 0; i < element_count; ++i) { + dim_ = std::max(dim_, src[i].dim()); + } + ConcurrentVectorImpl, + true>::set_data_raw(element_offset, + source, + element_count); + } + + int64_t + Dim() const { + return dim_; + } + + private: + int64_t dim_; +}; + template <> class ConcurrentVector : public ConcurrentVectorImpl { diff --git a/internal/core/src/segcore/FieldIndexing.cpp b/internal/core/src/segcore/FieldIndexing.cpp index 09af62b6e1596..7455346080ed1 100644 --- a/internal/core/src/segcore/FieldIndexing.cpp +++ b/internal/core/src/segcore/FieldIndexing.cpp @@ -11,6 +11,7 @@ #include #include + #include "common/EasyAssert.h" #include "fmt/format.h" #include "index/ScalarIndexSort.h" @@ -29,8 +30,8 @@ VectorFieldIndexing::VectorFieldIndexing(const FieldMeta& field_meta, int64_t segment_max_row_count, const SegcoreConfig& segcore_config) : FieldIndexing(field_meta, segcore_config), - build(false), - sync_with_index(false), + built_(false), + sync_with_index_(false), config_(std::make_unique(segment_max_row_count, field_index_meta, segcore_config, @@ -45,6 +46,7 @@ void VectorFieldIndexing::BuildIndexRange(int64_t ack_beg, int64_t ack_end, const VectorBase* vec_base) { + // No BuildIndexRange support for sparse vector. AssertInfo(field_meta_.get_data_type() == DataType::VECTOR_FLOAT, "Data type of vector field is not VECTOR_FLOAT"); auto dim = field_meta_.get_dim(); @@ -85,13 +87,65 @@ VectorFieldIndexing::GetDataFromIndex(const int64_t* seg_offsets, } void -VectorFieldIndexing::AppendSegmentIndex(int64_t reserved_offset, - int64_t size, - const VectorBase* field_raw_data, - const void* data_source) { +VectorFieldIndexing::AppendSegmentIndexSparse(int64_t reserved_offset, + int64_t size, + int64_t new_data_dim, + const VectorBase* field_raw_data, + const void* data_source) { + auto conf = get_build_params(); + auto source = dynamic_cast*>( + field_raw_data); + AssertInfo(source, + "field_raw_data can't cast to " + "ConcurrentVector type"); + AssertInfo(size > 0, "append 0 sparse rows to index is not allowed"); + if (!built_) { + AssertInfo(!sync_with_index_, "index marked synced before built"); + idx_t total_rows = reserved_offset + size; + idx_t chunk_id = 0; + auto dim = source->Dim(); + + while (total_rows > 0) { + auto mat = static_cast*>( + source->get_chunk_data(chunk_id)); + auto rows = std::min(source->get_size_per_chunk(), total_rows); + auto dataset = knowhere::GenDataSet(rows, dim, mat); + dataset->SetIsSparse(true); + try { + if (chunk_id == 0) { + index_->BuildWithDataset(dataset, conf); + } else { + index_->AddWithDataset(dataset, conf); + } + } catch (SegcoreError& error) { + LOG_ERROR("growing sparse index build error: {}", error.what()); + return; + } + index_cur_.fetch_add(rows); + total_rows -= rows; + chunk_id++; + } + built_ = true; + sync_with_index_ = true; + // if not built_, new rows in data_source have already been added to + // source(ConcurrentVector) and thus added to the + // index, thus no need to add again. + return; + } + + auto dataset = knowhere::GenDataSet(size, new_data_dim, data_source); + dataset->SetIsSparse(true); + index_->AddWithDataset(dataset, conf); + index_cur_.fetch_add(size); +} + +void +VectorFieldIndexing::AppendSegmentIndexDense(int64_t reserved_offset, + int64_t size, + const VectorBase* field_raw_data, + const void* data_source) { AssertInfo(field_meta_.get_data_type() == DataType::VECTOR_FLOAT, "Data type of vector field is not VECTOR_FLOAT"); - auto dim = field_meta_.get_dim(); auto conf = get_build_params(); auto source = @@ -100,8 +154,9 @@ VectorFieldIndexing::AppendSegmentIndex(int64_t reserved_offset, auto size_per_chunk = source->get_size_per_chunk(); //append vector [vector_id_beg, vector_id_end] into index //build index [vector_id_beg, build_threshold) when index not exist - if (!build) { + if (!built_) { idx_t vector_id_beg = index_cur_.load(); + Assert(vector_id_beg == 0); idx_t vector_id_end = get_build_threshold() - 1; auto chunk_id_beg = vector_id_beg / size_per_chunk; auto chunk_id_end = vector_id_end / size_per_chunk; @@ -143,7 +198,7 @@ VectorFieldIndexing::AppendSegmentIndex(int64_t reserved_offset, return; } index_cur_.fetch_add(vec_num); - build = true; + built_ = true; } //append rest data when index has built idx_t vector_id_beg = index_cur_.load(); @@ -153,11 +208,12 @@ VectorFieldIndexing::AppendSegmentIndex(int64_t reserved_offset, int64_t vec_num = vector_id_end - vector_id_beg + 1; if (vec_num <= 0) { - sync_with_index.store(true); + sync_with_index_.store(true); return; } - if (sync_with_index.load()) { + if (sync_with_index_.load()) { + Assert(size == vec_num); auto dataset = knowhere::GenDataSet(vec_num, dim, data_source); index_->AddWithDataset(dataset, conf); index_cur_.fetch_add(vec_num); @@ -179,7 +235,7 @@ VectorFieldIndexing::AppendSegmentIndex(int64_t reserved_offset, index_->AddWithDataset(dataset, conf); index_cur_.fetch_add(chunk_sz); } - sync_with_index.store(true); + sync_with_index_.store(true); } } @@ -188,6 +244,8 @@ VectorFieldIndexing::get_build_params() const { auto config = config_->GetBuildBaseParams(); config[knowhere::meta::DIM] = std::to_string(field_meta_.get_dim()); config[knowhere::meta::NUM_BUILD_THREAD] = std::to_string(1); + // for sparse float vector: drop_ratio_build config is not allowed to be set + // on growing segment index. return config; } @@ -203,7 +261,7 @@ VectorFieldIndexing::get_index_cursor() { } bool VectorFieldIndexing::sync_data_with_index() const { - return sync_with_index.load(); + return sync_with_index_.load(); } bool @@ -243,17 +301,10 @@ CreateIndex(const FieldMeta& field_meta, int64_t segment_max_row_count, const SegcoreConfig& segcore_config) { if (field_meta.is_vector()) { - if (field_meta.get_data_type() == DataType::VECTOR_FLOAT) { - return std::make_unique(field_meta, - field_index_meta, - segment_max_row_count, - segcore_config); - } else if (field_meta.get_data_type() == DataType::VECTOR_FLOAT16) { - return std::make_unique(field_meta, - field_index_meta, - segment_max_row_count, - segcore_config); - } else if (field_meta.get_data_type() == DataType::VECTOR_BFLOAT16) { + if (field_meta.get_data_type() == DataType::VECTOR_FLOAT || + field_meta.get_data_type() == DataType::VECTOR_FLOAT16 || + field_meta.get_data_type() == DataType::VECTOR_BFLOAT16 || + field_meta.get_data_type() == DataType::VECTOR_SPARSE_FLOAT) { return std::make_unique(field_meta, field_index_meta, segment_max_row_count, diff --git a/internal/core/src/segcore/FieldIndexing.h b/internal/core/src/segcore/FieldIndexing.h index 249a4b99da83e..0033a6b051680 100644 --- a/internal/core/src/segcore/FieldIndexing.h +++ b/internal/core/src/segcore/FieldIndexing.h @@ -51,10 +51,18 @@ class FieldIndexing { const VectorBase* vec_base) = 0; virtual void - AppendSegmentIndex(int64_t reserved_offset, - int64_t size, - const VectorBase* vec_base, - const void* data_source) = 0; + AppendSegmentIndexDense(int64_t reserved_offset, + int64_t size, + const VectorBase* vec_base, + const void* data_source) = 0; + + // new_data_dim is the dimension of the new data being appended(data_source) + virtual void + AppendSegmentIndexSparse(int64_t reserved_offset, + int64_t size, + int64_t new_data_dim, + const VectorBase* vec_base, + const void* data_source) = 0; virtual void GetDataFromIndex(const int64_t* seg_offsets, @@ -109,12 +117,22 @@ class ScalarFieldIndexing : public FieldIndexing { const VectorBase* vec_base) override; void - AppendSegmentIndex(int64_t reserved_offset, - int64_t size, - const VectorBase* vec_base, - const void* data_source) override { + AppendSegmentIndexDense(int64_t reserved_offset, + int64_t size, + const VectorBase* vec_base, + const void* data_source) override { + PanicInfo(Unsupported, + "scalar index doesn't support append vector segment index"); + } + + void + AppendSegmentIndexSparse(int64_t reserved_offset, + int64_t size, + int64_t new_data_dim, + const VectorBase* vec_base, + const void* data_source) override { PanicInfo(Unsupported, - "scalar index don't support append segment index"); + "scalar index doesn't support append vector segment index"); } void @@ -171,10 +189,17 @@ class VectorFieldIndexing : public FieldIndexing { const VectorBase* vec_base) override; void - AppendSegmentIndex(int64_t reserved_offset, - int64_t size, - const VectorBase* field_raw_data, - const void* data_source) override; + AppendSegmentIndexDense(int64_t reserved_offset, + int64_t size, + const VectorBase* field_raw_data, + const void* data_source) override; + + void + AppendSegmentIndexSparse(int64_t reserved_offset, + int64_t size, + int64_t new_data_dim, + const VectorBase* field_raw_data, + const void* data_source) override; void GetDataFromIndex(const int64_t* seg_offsets, @@ -214,9 +239,13 @@ class VectorFieldIndexing : public FieldIndexing { get_search_params(const SearchInfo& searchInfo) const; private: + // current number of rows in index. std::atomic index_cur_ = 0; - std::atomic build; - std::atomic sync_with_index; + // whether the growing index has been built. + std::atomic built_; + // whether all insertd data has been added to growing index and can be + // searched. + std::atomic sync_with_index_; std::unique_ptr config_; std::unique_ptr index_; tbb::concurrent_vector> data_; @@ -283,19 +312,28 @@ class IndexingRecord { FieldId fieldId, const DataArray* stream_data, const InsertRecord& record) { - if (is_in(fieldId)) { - auto& indexing = field_indexings_.at(fieldId); - if (indexing->get_field_meta().is_vector() && - indexing->get_field_meta().get_data_type() == - DataType::VECTOR_FLOAT && - reserved_offset + size >= indexing->get_build_threshold()) { - auto field_raw_data = record.get_field_data_base(fieldId); - indexing->AppendSegmentIndex( - reserved_offset, - size, - field_raw_data, - stream_data->vectors().float_vector().data().data()); - } + if (!is_in(fieldId)) { + return; + } + auto& indexing = field_indexings_.at(fieldId); + auto type = indexing->get_field_meta().get_data_type(); + auto field_raw_data = record.get_field_data_base(fieldId); + if (type == DataType::VECTOR_FLOAT && + reserved_offset + size >= indexing->get_build_threshold()) { + indexing->AppendSegmentIndexDense( + reserved_offset, + size, + field_raw_data, + stream_data->vectors().float_vector().data().data()); + } else if (type == DataType::VECTOR_SPARSE_FLOAT) { + auto data = SparseBytesToRows( + stream_data->vectors().sparse_float_vector().contents()); + indexing->AppendSegmentIndexSparse( + reserved_offset, + size, + stream_data->vectors().sparse_float_vector().dim(), + field_raw_data, + data.get()); } } @@ -307,16 +345,28 @@ class IndexingRecord { FieldId fieldId, const FieldDataPtr data, const InsertRecord& record) { - if (is_in(fieldId)) { - auto& indexing = field_indexings_.at(fieldId); - if (indexing->get_field_meta().is_vector() && - indexing->get_field_meta().get_data_type() == - DataType::VECTOR_FLOAT && - reserved_offset + size >= indexing->get_build_threshold()) { - auto vec_base = record.get_field_data_base(fieldId); - indexing->AppendSegmentIndex( - reserved_offset, size, vec_base, data->Data()); - } + if (!is_in(fieldId)) { + return; + } + auto& indexing = field_indexings_.at(fieldId); + auto type = indexing->get_field_meta().get_data_type(); + const void* p = data->Data(); + + if (type == DataType::VECTOR_FLOAT && + reserved_offset + size >= indexing->get_build_threshold()) { + auto vec_base = record.get_field_data_base(fieldId); + indexing->AppendSegmentIndexDense( + reserved_offset, size, vec_base, data->Data()); + } else if (type == DataType::VECTOR_SPARSE_FLOAT) { + auto vec_base = record.get_field_data_base(fieldId); + indexing->AppendSegmentIndexSparse( + reserved_offset, + size, + std::dynamic_pointer_cast>( + data) + ->Dim(), + vec_base, + p); } } @@ -396,14 +446,12 @@ class IndexingRecord { IndexMetaPtr index_meta_; const SegcoreConfig& segcore_config_; - private: // control info std::atomic resource_ack_ = 0; // std::atomic finished_ack_ = 0; AckResponder finished_ack_; std::mutex mutex_; - private: // field_offset => indexing std::map> field_indexings_; }; diff --git a/internal/core/src/segcore/IndexConfigGenerator.cpp b/internal/core/src/segcore/IndexConfigGenerator.cpp index 583b457d9221f..c9c390667237c 100644 --- a/internal/core/src/segcore/IndexConfigGenerator.cpp +++ b/internal/core/src/segcore/IndexConfigGenerator.cpp @@ -20,8 +20,23 @@ VecIndexConfig::VecIndexConfig(const int64_t max_index_row_cout, : max_index_row_count_(max_index_row_cout), config_(config) { origin_index_type_ = index_meta_.GetIndexType(); metric_type_ = index_meta_.GeMetricType(); + // Currently for dense vector index, if the segment is growing, we use IVFCC + // as the index type; if the segment is sealed but its index has not been + // built by the index node, we use IVFFLAT as the temp index type and + // release it once the index node has finished building the index and query + // node has loaded it. - index_type_ = support_index_types.at(segment_type); + // But for sparse vector index(INDEX_SPARSE_INVERTED_INDEX and + // INDEX_SPARSE_WAND), those index themselves can be used as the temp index + // type, so we can avoid the extra step of "releast temp and load". + + if (origin_index_type_ == + knowhere::IndexEnum::INDEX_SPARSE_INVERTED_INDEX || + origin_index_type_ == knowhere::IndexEnum::INDEX_SPARSE_WAND) { + index_type_ = origin_index_type_; + } else { + index_type_ = support_index_types.at(segment_type); + } build_params_[knowhere::meta::METRIC_TYPE] = metric_type_; build_params_[knowhere::indexparam::NLIST] = std::to_string(config_.get_nlist()); @@ -29,6 +44,8 @@ VecIndexConfig::VecIndexConfig(const int64_t max_index_row_cout, std::max((int)(config_.get_chunk_rows() / config_.get_nlist()), 48)); search_params_[knowhere::indexparam::NPROBE] = std::to_string(config_.get_nprobe()); + // note for sparse vector index: drop_ratio_build is not allowed for growing + // segment index. LOG_INFO( "VecIndexConfig: origin_index_type={}, index_type={}, metric_type={}", origin_index_type_, @@ -38,6 +55,14 @@ VecIndexConfig::VecIndexConfig(const int64_t max_index_row_cout, int64_t VecIndexConfig::GetBuildThreshold() const noexcept { + // For sparse, do not impose a threshold and start using index with any + // number of rows. Unlike dense vector index, growing sparse vector index + // does not require a minimum number of rows to train. + if (origin_index_type_ == + knowhere::IndexEnum::INDEX_SPARSE_INVERTED_INDEX || + origin_index_type_ == knowhere::IndexEnum::INDEX_SPARSE_WAND) { + return 0; + } assert(VecIndexConfig::index_build_ratio.count(index_type_)); auto ratio = VecIndexConfig::index_build_ratio.at(index_type_); assert(ratio >= 0.0 && ratio < 1.0); diff --git a/internal/core/src/segcore/IndexConfigGenerator.h b/internal/core/src/segcore/IndexConfigGenerator.h index 563e95e4837b0..ce8c20b609538 100644 --- a/internal/core/src/segcore/IndexConfigGenerator.h +++ b/internal/core/src/segcore/IndexConfigGenerator.h @@ -27,6 +27,8 @@ enum class IndexConfigLevel { SYSTEM_ASSIGN = 3 }; +// this is the config used for generating growing index or the temp sealed index +// when the segment is sealed before the index is built. class VecIndexConfig { inline static const std::map support_index_types = {{SegmentType::Growing, knowhere::IndexEnum::INDEX_FAISS_IVFFLAT_CC}, diff --git a/internal/core/src/segcore/InsertRecord.cpp b/internal/core/src/segcore/InsertRecord.cpp deleted file mode 100644 index be9cc0a85a257..0000000000000 --- a/internal/core/src/segcore/InsertRecord.cpp +++ /dev/null @@ -1,12 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License - -#include "InsertRecord.h" diff --git a/internal/core/src/segcore/InsertRecord.h b/internal/core/src/segcore/InsertRecord.h index cb79020036187..1723e40910d55 100644 --- a/internal/core/src/segcore/InsertRecord.h +++ b/internal/core/src/segcore/InsertRecord.h @@ -346,6 +346,11 @@ struct InsertRecord { this->append_field_data( field_id, field_meta.get_dim(), size_per_chunk); continue; + } else if (field_meta.get_data_type() == + DataType::VECTOR_SPARSE_FLOAT) { + this->append_field_data(field_id, + size_per_chunk); + continue; } else { PanicInfo(DataTypeInvalid, fmt::format("unsupported vector type", @@ -524,8 +529,7 @@ struct InsertRecord { AssertInfo(fields_data_.find(field_id) != fields_data_.end(), "Cannot find field_data with field_id: " + std::to_string(field_id.get())); - auto ptr = fields_data_.at(field_id).get(); - return ptr; + return fields_data_.at(field_id).get(); } // get field data in given type, const version @@ -552,7 +556,7 @@ struct InsertRecord { template void append_field_data(FieldId field_id, int64_t size_per_chunk) { - static_assert(IsScalar); + static_assert(IsScalar || IsSparse); fields_data_.emplace( field_id, std::make_unique>(size_per_chunk)); } @@ -608,7 +612,6 @@ struct InsertRecord { std::unique_ptr pk2offset_; private: - // std::vector> fields_data_; std::unordered_map> fields_data_{}; mutable std::shared_mutex shared_mutex_{}; }; diff --git a/internal/core/src/segcore/ScalarIndex.cpp b/internal/core/src/segcore/ScalarIndex.cpp deleted file mode 100644 index c5aaacdd70f09..0000000000000 --- a/internal/core/src/segcore/ScalarIndex.cpp +++ /dev/null @@ -1,88 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License - -#include "common/EasyAssert.h" -#include "ScalarIndex.h" - -namespace milvus::segcore { -std::pair, std::vector> -ScalarIndexVector::do_search_ids(const IdArray& ids) const { - auto res_ids = std::make_unique(); - // TODO: support string array - static_assert(std::is_same_v); - AssertInfo(ids.has_int_id(), "ids doesn't have int_id field"); - auto src_ids = ids.int_id(); - auto dst_ids = res_ids->mutable_int_id(); - std::vector dst_offsets; - - // TODO: a possible optimization: - // TODO: sort the input id array to make access cache friendly - - // assume no repeated key now - // TODO: support repeated key - for (auto id : src_ids.data()) { - using Pair = std::pair; - auto [iter_beg, iter_end] = - std::equal_range(mapping_.begin(), - mapping_.end(), - std::make_pair(id, SegOffset(0)), - [](const Pair& left, const Pair& right) { - return left.first < right.first; - }); - - for (auto& iter = iter_beg; iter != iter_end; iter++) { - auto [entry_id, entry_offset] = *iter; - dst_ids->add_data(entry_id); - dst_offsets.push_back(entry_offset); - } - } - return {std::move(res_ids), std::move(dst_offsets)}; -} - -std::pair, std::vector> -ScalarIndexVector::do_search_ids(const std::vector& ids) const { - std::vector dst_offsets; - std::vector dst_ids; - - for (auto id : ids) { - using Pair = std::pair; - auto [iter_beg, iter_end] = - std::equal_range(mapping_.begin(), - mapping_.end(), - std::make_pair(id, SegOffset(0)), - [](const Pair& left, const Pair& right) { - return left.first < right.first; - }); - - for (auto& iter = iter_beg; iter != iter_end; iter++) { - auto [entry_id, entry_offset] = *iter_beg; - dst_ids.emplace_back(entry_id); - dst_offsets.push_back(entry_offset); - } - } - return {std::move(dst_ids), std::move(dst_offsets)}; -} - -void -ScalarIndexVector::append_data(const ScalarIndexVector::T* ids, - int64_t count, - SegOffset base) { - for (int64_t i = 0; i < count; ++i) { - auto offset = base + SegOffset(i); - mapping_.emplace_back(ids[i], offset); - } -} - -void -ScalarIndexVector::build() { - std::sort(mapping_.begin(), mapping_.end()); -} -} // namespace milvus::segcore diff --git a/internal/core/src/segcore/ScalarIndex.h b/internal/core/src/segcore/ScalarIndex.h deleted file mode 100644 index ae3e846fce6a6..0000000000000 --- a/internal/core/src/segcore/ScalarIndex.h +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License - -#pragma once - -#include -#include -#include -#include - -#include "common/Types.h" -#include "pb/schema.pb.h" - -namespace milvus::segcore { - -class ScalarIndexBase { - public: - virtual std::pair, std::vector> - do_search_ids(const IdArray& ids) const = 0; - virtual std::pair, std::vector> - do_search_ids(const std::vector& ids) const = 0; - virtual ~ScalarIndexBase() = default; - virtual std::string - debug() const = 0; -}; - -class ScalarIndexVector : public ScalarIndexBase { - using T = int64_t; - - public: - // TODO: use proto::schema::ids - void - append_data(const T* ids, int64_t count, SegOffset base); - - void - build(); - - std::pair, std::vector> - do_search_ids(const IdArray& ids) const override; - - std::pair, std::vector> - do_search_ids(const std::vector& ids) const override; - - std::string - debug() const override { - std::string dbg_str; - for (auto pr : mapping_) { - dbg_str += "<" + std::to_string(pr.first) + "->" + - std::to_string(pr.second.get()) + ">"; - } - return dbg_str; - } - - private: - std::vector> mapping_; -}; - -} // namespace milvus::segcore diff --git a/internal/core/src/segcore/SegmentGrowing.h b/internal/core/src/segcore/SegmentGrowing.h index f00b3fc3fca69..5d51fe3cb52b9 100644 --- a/internal/core/src/segcore/SegmentGrowing.h +++ b/internal/core/src/segcore/SegmentGrowing.h @@ -32,7 +32,7 @@ class SegmentGrowing : public SegmentInternalInterface { int64_t size, const int64_t* row_ids, const Timestamp* timestamps, - const InsertData* insert_data) = 0; + const InsertRecordProto* insert_record_proto) = 0; SegmentType type() const override { diff --git a/internal/core/src/segcore/SegmentGrowingImpl.cpp b/internal/core/src/segcore/SegmentGrowingImpl.cpp index b9aca5f63968a..d78f80f303bee 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.cpp +++ b/internal/core/src/segcore/SegmentGrowingImpl.cpp @@ -87,15 +87,13 @@ SegmentGrowingImpl::Insert(int64_t reserved_offset, int64_t num_rows, const int64_t* row_ids, const Timestamp* timestamps_raw, - const InsertData* insert_data) { - AssertInfo(insert_data->num_rows() == num_rows, + const InsertRecordProto* insert_record_proto) { + AssertInfo(insert_record_proto->num_rows() == num_rows, "Entities_raw count not equal to insert size"); - // AssertInfo(insert_data->fields_data_size() == schema_->size(), - // "num fields of insert data not equal to num of schema fields"); // step 1: check insert data if valid std::unordered_map field_id_to_offset; int64_t field_offset = 0; - for (const auto& field : insert_data->fields_data()) { + for (const auto& field : insert_record_proto->fields_data()) { auto field_id = FieldId(field.field_id()); AssertInfo(!field_id_to_offset.count(field_id), "duplicate field data"); field_id_to_offset.emplace(field_id, field_offset++); @@ -122,7 +120,7 @@ SegmentGrowingImpl::Insert(int64_t reserved_offset, insert_record_.get_field_data_base(field_id)->set_data_raw( reserved_offset, num_rows, - &insert_data->fields_data(data_offset), + &insert_record_proto->fields_data(data_offset), field_meta); } //insert vector data into index @@ -131,13 +129,15 @@ SegmentGrowingImpl::Insert(int64_t reserved_offset, reserved_offset, num_rows, field_id, - &insert_data->fields_data(data_offset), + &insert_record_proto->fields_data(data_offset), insert_record_); } // update average row data size auto field_data_size = GetRawDataSizeOfDataArray( - &insert_data->fields_data(data_offset), field_meta, num_rows); + &insert_record_proto->fields_data(data_offset), + field_meta, + num_rows); if (datatype_is_variable(field_meta.get_data_type())) { SegmentInternalInterface::set_field_avg_size( field_id, num_rows, field_data_size); @@ -153,7 +153,7 @@ SegmentGrowingImpl::Insert(int64_t reserved_offset, AssertInfo(field_id.get() != INVALID_FIELD_ID, "Primary key is -1"); std::vector pks(num_rows); ParsePksFromFieldData( - pks, insert_data->fields_data(field_id_to_offset[field_id])); + pks, insert_record_proto->fields_data(field_id_to_offset[field_id])); for (int i = 0; i < num_rows; ++i) { insert_record_.insert_pk(pks[i], reserved_offset + i); } diff --git a/internal/core/src/segcore/SegmentGrowingImpl.h b/internal/core/src/segcore/SegmentGrowingImpl.h index 9d7a0668e9b3f..d26fb6fb14822 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.h +++ b/internal/core/src/segcore/SegmentGrowingImpl.h @@ -45,7 +45,7 @@ class SegmentGrowingImpl : public SegmentGrowing { int64_t size, const int64_t* row_ids, const Timestamp* timestamps, - const InsertData* insert_data) override; + const InsertRecordProto* insert_record_proto) override; bool Contain(const PkType& pk) const override { diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index cd0367cedf465..120316529c7d4 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -37,6 +37,7 @@ #include "common/FieldData.h" #include "common/Types.h" #include "log/Log.h" +#include "mmap/Utils.h" #include "pb/schema.pb.h" #include "mmap/Types.h" #include "query/ScalarIndex.h" @@ -252,7 +253,7 @@ SegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& load_info) { field_data_info.channel->set_capacity(parallel_degree * 2); auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::MIDDLE); - auto load_future = pool.Submit( + pool.Submit( LoadFieldDatasFromRemote, insert_files, field_data_info.channel); LOG_INFO("segment {} submits load field {} task to thread pool", @@ -272,6 +273,7 @@ SegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& load_info) { void SegmentSealedImpl::LoadFieldDataV2(const LoadFieldDataInfo& load_info) { + // TODO(SPARSE): support storage v2 // NOTE: lock only when data is ready to avoid starvation // only one field for now, parallel load field data in golang size_t num_rows = storage::GetNumRowsForLoadInfo(load_info); @@ -435,6 +437,16 @@ SegmentSealedImpl::LoadFieldData(FieldId field_id, FieldDataInfo& data) { column = std::move(var_column); break; } + case milvus::DataType::VECTOR_SPARSE_FLOAT: { + auto col = std::make_shared(field_meta); + FieldDataPtr field_data; + while (data.channel->pop(field_data)) { + stats_.mem_size += field_data->Size(); + col->AppendBatch(field_data); + } + column = std::move(col); + break; + } default: { PanicInfo(DataTypeInvalid, fmt::format("unsupported data type", data_type)); @@ -566,6 +578,7 @@ SegmentSealedImpl::MapFieldData(const FieldId field_id, FieldDataInfo& data) { column = std::move(arr_column); break; } + // TODO(SPARSE) support mmap default: { PanicInfo(DataTypeInvalid, fmt::format("unsupported data type {}", data_type)); @@ -1514,14 +1527,17 @@ SegmentSealedImpl::generate_binlog_index(const FieldId field_id) { auto& field_index_meta = col_index_meta_->GetFieldIndexMeta(field_id); auto& index_params = field_index_meta.GetIndexParams(); + bool is_sparse = + field_meta.get_data_type() == DataType::VECTOR_SPARSE_FLOAT; + auto enable_binlog_index = [&]() { // checkout config if (!segcore_config_.get_enable_interim_segment_index()) { return false; } // check data type - if (!field_meta.is_vector() || - field_meta.get_data_type() != DataType::VECTOR_FLOAT) { + if (field_meta.get_data_type() != DataType::VECTOR_FLOAT && + !is_sparse) { return false; } // check index type @@ -1546,7 +1562,7 @@ SegmentSealedImpl::generate_binlog_index(const FieldId field_id) { std::shared_lock lck(mutex_); row_count = num_rows_.value(); } - auto dim = field_meta.get_dim(); + // generate index params auto field_binlog_config = std::unique_ptr( new VecIndexConfig(row_count, @@ -1556,19 +1572,24 @@ SegmentSealedImpl::generate_binlog_index(const FieldId field_id) { if (row_count < field_binlog_config->GetBuildThreshold()) { return false; } - auto build_config = field_binlog_config->GetBuildBaseParams(); - build_config[knowhere::meta::DIM] = std::to_string(dim); - build_config[knowhere::meta::NUM_BUILD_THREAD] = std::to_string(1); - auto index_metric = field_binlog_config->GetMetricType(); - std::shared_ptr vec_data{}; { std::shared_lock lck(mutex_); vec_data = fields_.at(field_id); } + auto dim = is_sparse + ? dynamic_cast(vec_data.get())->Dim() + : field_meta.get_dim(); + + auto build_config = field_binlog_config->GetBuildBaseParams(); + build_config[knowhere::meta::DIM] = std::to_string(dim); + build_config[knowhere::meta::NUM_BUILD_THREAD] = std::to_string(1); + auto index_metric = field_binlog_config->GetMetricType(); + auto dataset = knowhere::GenDataSet(row_count, dim, (void*)vec_data->Data()); dataset->SetIsOwner(false); + dataset->SetIsSparse(is_sparse); index::IndexBasePtr vec_index = std::make_unique>( diff --git a/internal/core/src/segcore/SegmentSealedImpl.h b/internal/core/src/segcore/SegmentSealedImpl.h index c1cb987041296..713ffec7a3f54 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.h +++ b/internal/core/src/segcore/SegmentSealedImpl.h @@ -24,7 +24,6 @@ #include "ConcurrentVector.h" #include "DeletedRecord.h" -#include "ScalarIndex.h" #include "SealedIndexingRecord.h" #include "SegmentSealed.h" #include "TimestampIndex.h" diff --git a/internal/core/src/segcore/Utils.cpp b/internal/core/src/segcore/Utils.cpp index a418c53b401c8..b05adadba9e13 100644 --- a/internal/core/src/segcore/Utils.cpp +++ b/internal/core/src/segcore/Utils.cpp @@ -21,8 +21,9 @@ #include "index/ScalarIndex.h" #include "log/Log.h" #include "mmap/Utils.h" -#include "storage/ThreadPool.h" +#include "common/FieldData.h" #include "storage/RemoteChunkManagerSingleton.h" +#include "common/Common.h" #include "storage/ThreadPools.h" #include "storage/Util.h" @@ -205,6 +206,11 @@ GetRawDataSizeOfDataArray(const DataArray* data, break; } + case DataType::VECTOR_SPARSE_FLOAT: { + // TODO(SPARSE, size) + result += data->vectors().sparse_float_vector().ByteSizeLong(); + break; + } default: { PanicInfo( DataTypeInvalid, @@ -338,6 +344,10 @@ CreateVectorDataArray(int64_t count, const FieldMeta& field_meta) { obj->resize(length * sizeof(bfloat16)); break; } + case DataType::VECTOR_SPARSE_FLOAT: { + // does nothing here + break; + } default: { PanicInfo(DataTypeInvalid, fmt::format("unsupported datatype {}", data_type)); @@ -446,8 +456,11 @@ CreateVectorDataArrayFrom(const void* data_raw, field_meta.get_data_type())); auto vector_array = data_array->mutable_vectors(); - auto dim = field_meta.get_dim(); - vector_array->set_dim(dim); + auto dim = 0; + if (!datatype_is_sparse_vector(data_type)) { + dim = field_meta.get_dim(); + vector_array->set_dim(dim); + } switch (data_type) { case DataType::VECTOR_FLOAT: { auto length = count * dim; @@ -479,6 +492,15 @@ CreateVectorDataArrayFrom(const void* data_raw, obj->assign(data, length * sizeof(bfloat16)); break; } + case DataType::VECTOR_SPARSE_FLOAT: { + SparseRowsToProto( + reinterpret_cast*>( + data_raw), + count, + vector_array->mutable_sparse_float_vector()); + vector_array->set_dim(vector_array->sparse_float_vector().dim()); + break; + } default: { PanicInfo(DataTypeInvalid, fmt::format("unsupported datatype {}", data_type)); @@ -534,6 +556,15 @@ MergeDataArray( auto data = VEC_FIELD_DATA(src_field_data, binary); auto obj = vector_array->mutable_binary_vector(); obj->assign(data + src_offset * num_bytes, num_bytes); + } else if (field_meta.get_data_type() == + DataType::VECTOR_SPARSE_FLOAT) { + auto src = src_field_data->vectors().sparse_float_vector(); + auto dst = vector_array->mutable_sparse_float_vector(); + if (src.dim() > dst->dim()) { + dst->set_dim(src.dim()); + } + vector_array->set_dim(dst->dim()); + *dst->mutable_contents() = src.contents(); } else { PanicInfo(DataTypeInvalid, fmt::format("unsupported datatype {}", data_type)); diff --git a/internal/core/src/segcore/segment_c.cpp b/internal/core/src/segcore/segment_c.cpp index 75f697f93f995..111294f915286 100644 --- a/internal/core/src/segcore/segment_c.cpp +++ b/internal/core/src/segcore/segment_c.cpp @@ -207,12 +207,17 @@ Insert(CSegmentInterface c_segment, const uint64_t data_info_len) { try { auto segment = static_cast(c_segment); - auto insert_data = std::make_unique(); - auto suc = insert_data->ParseFromArray(data_info, data_info_len); + auto insert_record_proto = + std::make_unique(); + auto suc = + insert_record_proto->ParseFromArray(data_info, data_info_len); AssertInfo(suc, "failed to parse insert data from records"); - segment->Insert( - reserved_offset, size, row_ids, timestamps, insert_data.get()); + segment->Insert(reserved_offset, + size, + row_ids, + timestamps, + insert_record_proto.get()); return milvus::SuccessCStatus(); } catch (std::exception& e) { return milvus::FailureCStatus(&e); diff --git a/internal/core/src/storage/ChunkCache.cpp b/internal/core/src/storage/ChunkCache.cpp index 1e40dfe850961..99638558ea36f 100644 --- a/internal/core/src/storage/ChunkCache.cpp +++ b/internal/core/src/storage/ChunkCache.cpp @@ -15,6 +15,7 @@ // limitations under the License. #include "ChunkCache.h" +#include "mmap/Utils.h" namespace milvus::storage { diff --git a/internal/core/src/storage/ChunkManager.h b/internal/core/src/storage/ChunkManager.h index 6b0cfb80915ec..9f51154ee6f69 100644 --- a/internal/core/src/storage/ChunkManager.h +++ b/internal/core/src/storage/ChunkManager.h @@ -58,7 +58,7 @@ class ChunkManager { Read(const std::string& filepath, void* buf, uint64_t len) = 0; /** - * @brief Write buffer to file with offset + * @brief Write buffer to file without offset * @param filepath * @param buf * @param len diff --git a/internal/core/src/storage/Event.cpp b/internal/core/src/storage/Event.cpp index 55ff73ced2765..dbe0f8861ab8c 100644 --- a/internal/core/src/storage/Event.cpp +++ b/internal/core/src/storage/Event.cpp @@ -215,7 +215,8 @@ std::vector BaseEventData::Serialize() { auto data_type = field_data->get_data_type(); std::shared_ptr payload_writer; - if (milvus::datatype_is_vector(data_type)) { + if (milvus::datatype_is_vector(data_type) && + data_type != DataType::VECTOR_SPARSE_FLOAT) { payload_writer = std::make_unique(data_type, field_data->get_dim()); } else { @@ -259,6 +260,18 @@ BaseEventData::Serialize() { } break; } + case DataType::VECTOR_SPARSE_FLOAT: { + for (size_t offset = 0; offset < field_data->get_num_rows(); + ++offset) { + auto row = + static_cast*>( + field_data->RawValue(offset)); + payload_writer->add_one_binary_payload( + static_cast(row->data()), + row->data_byte_size()); + } + break; + } default: { auto payload = Payload{data_type, diff --git a/internal/core/src/storage/PayloadReader.cpp b/internal/core/src/storage/PayloadReader.cpp index 2c1ae76fa72e3..0305b5ce4b271 100644 --- a/internal/core/src/storage/PayloadReader.cpp +++ b/internal/core/src/storage/PayloadReader.cpp @@ -59,7 +59,9 @@ PayloadReader::init(std::shared_ptr input) { int64_t column_index = 0; auto file_meta = arrow_reader->parquet_reader()->metadata(); - dim_ = datatype_is_vector(column_type_) + // dim is unused for sparse float vector + dim_ = (datatype_is_vector(column_type_) && + column_type_ != DataType::VECTOR_SPARSE_FLOAT) ? GetDimensionFromFileMetaData( file_meta->schema()->Column(column_index), column_type_) : 1; diff --git a/internal/core/src/storage/PayloadWriter.cpp b/internal/core/src/storage/PayloadWriter.cpp index 54c47ed81ea68..52551a821f90d 100644 --- a/internal/core/src/storage/PayloadWriter.cpp +++ b/internal/core/src/storage/PayloadWriter.cpp @@ -31,6 +31,9 @@ PayloadWriter::PayloadWriter(const DataType column_type) // create payload writer for vector data type PayloadWriter::PayloadWriter(const DataType column_type, int dim) : column_type_(column_type) { + AssertInfo(column_type != DataType::VECTOR_SPARSE_FLOAT, + "PayloadWriter for Sparse Float Vector should be created " + "using the constructor without dimension"); init_dimension(dim); } @@ -58,7 +61,9 @@ PayloadWriter::add_one_string_payload(const char* str, int str_size) { void PayloadWriter::add_one_binary_payload(const uint8_t* data, int length) { AssertInfo(output_ == nullptr, "payload writer has been finished"); - AssertInfo(milvus::datatype_is_binary(column_type_), "mismatch data type"); + AssertInfo(milvus::datatype_is_binary(column_type_) || + milvus::datatype_is_sparse_vector(column_type_), + "mismatch data type"); AddOneBinaryToArrowBuilder(builder_, data, length); rows_.fetch_add(1); } diff --git a/internal/core/src/storage/Util.cpp b/internal/core/src/storage/Util.cpp index ed2c597d2a9d8..31d5600515532 100644 --- a/internal/core/src/storage/Util.cpp +++ b/internal/core/src/storage/Util.cpp @@ -39,8 +39,10 @@ #include "storage/OpenDALChunkManager.h" #endif #include "storage/Types.h" -#include "storage/ThreadPools.h" #include "storage/Util.h" +#include "storage/ThreadPools.h" +#include "storage/MemFileManagerImpl.h" +#include "storage/DiskFileManagerImpl.h" namespace milvus::storage { @@ -170,6 +172,12 @@ AddPayloadToArrowBuilder(std::shared_ptr builder, add_vector_payload(builder, const_cast(raw_data), length); break; } + case DataType::VECTOR_SPARSE_FLOAT: { + PanicInfo(DataTypeInvalid, + "Sparse Float Vector payload should be added by calling " + "add_one_binary_payload", + data_type); + } default: { PanicInfo(DataTypeInvalid, "unsupported data type {}", data_type); } @@ -242,6 +250,10 @@ CreateArrowBuilder(DataType data_type) { case DataType::JSON: { return std::make_shared(); } + // sparse float vector doesn't require a dim + case DataType::VECTOR_SPARSE_FLOAT: { + return std::make_shared(); + } default: { PanicInfo( DataTypeInvalid, "unsupported numeric data type {}", data_type); @@ -311,6 +323,10 @@ CreateArrowSchema(DataType data_type) { case DataType::JSON: { return arrow::schema({arrow::field("val", arrow::binary())}); } + // sparse float vector doesn't require a dim + case DataType::VECTOR_SPARSE_FLOAT: { + return arrow::schema({arrow::field("val", arrow::binary())}); + } default: { PanicInfo( DataTypeInvalid, "unsupported numeric data type {}", data_type); @@ -341,6 +357,9 @@ CreateArrowSchema(DataType data_type, int dim) { return arrow::schema({arrow::field( "val", arrow::fixed_size_binary(dim * sizeof(bfloat16)))}); } + case DataType::VECTOR_SPARSE_FLOAT: { + return arrow::schema({arrow::field("val", arrow::binary())}); + } default: { PanicInfo( DataTypeInvalid, "unsupported vector data type {}", data_type); @@ -364,6 +383,11 @@ GetDimensionFromFileMetaData(const parquet::ColumnDescriptor* schema, case DataType::VECTOR_BFLOAT16: { return schema->type_length() / sizeof(bfloat16); } + case DataType::VECTOR_SPARSE_FLOAT: { + PanicInfo(DataTypeInvalid, + fmt::format("GetDimensionFromFileMetaData should not be " + "called for sparse vector")); + } default: PanicInfo(DataTypeInvalid, "unsupported data type {}", data_type); } @@ -501,11 +525,12 @@ EncodeAndUploadFieldSlice(ChunkManager* chunk_manager, field_data->FillFieldData(buf, element_count); auto insertData = std::make_shared(field_data); insertData->SetFieldDataMeta(field_data_meta); - auto serialized_index_data = insertData->serialize_to_remote_file(); - auto serialized_index_size = serialized_index_data.size(); - chunk_manager->Write( - object_key, serialized_index_data.data(), serialized_index_size); - return std::make_pair(std::move(object_key), serialized_index_size); + auto serialized_inserted_data = insertData->serialize_to_remote_file(); + auto serialized_inserted_data_size = serialized_inserted_data.size(); + chunk_manager->Write(object_key, + serialized_inserted_data.data(), + serialized_inserted_data_size); + return std::make_pair(std::move(object_key), serialized_inserted_data_size); } std::vector>> @@ -738,6 +763,9 @@ CreateFieldData(const DataType& type, int64_t dim, int64_t total_num_rows) { case DataType::VECTOR_BFLOAT16: return std::make_shared>( dim, type, total_num_rows); + case DataType::VECTOR_SPARSE_FLOAT: + return std::make_shared>( + type, total_num_rows); default: throw SegcoreError( DataTypeInvalid, diff --git a/internal/core/src/storage/parquet_c.h b/internal/core/src/storage/parquet_c.h index db54eb7c63ed8..0348c9461bd06 100644 --- a/internal/core/src/storage/parquet_c.h +++ b/internal/core/src/storage/parquet_c.h @@ -31,6 +31,8 @@ typedef struct CBuffer { } CBuffer; //============= payload writer ====================== +// TODO(SPARSE): CPayloadWriter is no longer used as we switch to the payload +// writer in golang. Thus not implementing sparse float vector support here. typedef void* CPayloadWriter; CPayloadWriter NewPayloadWriter(int columnType); diff --git a/internal/core/unittest/test_binlog_index.cpp b/internal/core/unittest/test_binlog_index.cpp index d4ddc870a6abe..2737af6c92a9c 100644 --- a/internal/core/unittest/test_binlog_index.cpp +++ b/internal/core/unittest/test_binlog_index.cpp @@ -141,9 +141,9 @@ class BinlogIndexTest : public ::testing::TestWithParam { std::shared_ptr vec_data; }; -INSTANTIATE_TEST_CASE_P(MetricTypeParameters, - BinlogIndexTest, - ::testing::Values(knowhere::metric::L2)); +INSTANTIATE_TEST_SUITE_P(MetricTypeParameters, + BinlogIndexTest, + ::testing::Values(knowhere::metric::L2)); TEST_P(BinlogIndexTest, Accuracy) { IndexMetaPtr collection_index_meta = diff --git a/internal/core/unittest/test_concurrent_vector.cpp b/internal/core/unittest/test_concurrent_vector.cpp index eea65bd4faa00..a59f07d0dea4a 100644 --- a/internal/core/unittest/test_concurrent_vector.cpp +++ b/internal/core/unittest/test_concurrent_vector.cpp @@ -34,8 +34,7 @@ TEST(ConcurrentVector, TestSingle) { for (auto& x : vec) { x = data++; } - c_vec.grow_to_at_least(total_count + insert_size); - c_vec.set_data(total_count, vec.data(), insert_size); + c_vec.set_data_raw(total_count, vec.data(), insert_size); total_count += insert_size; } ASSERT_EQ(c_vec.num_chunk(), (total_count + 31) / 32); @@ -66,8 +65,7 @@ TEST(ConcurrentVector, TestMultithreads) { x = data++ * threads + thread_id; } auto offset = ack_counter.fetch_add(insert_size); - c_vec.grow_to_at_least(offset + insert_size); - c_vec.set_data(offset, vec.data(), insert_size); + c_vec.set_data_raw(offset, vec.data(), insert_size); total_count += insert_size; } assert(data == total_count * dim); diff --git a/internal/core/unittest/test_data_codec.cpp b/internal/core/unittest/test_data_codec.cpp index 36a6621bc9953..0a4e7b36ff657 100644 --- a/internal/core/unittest/test_data_codec.cpp +++ b/internal/core/unittest/test_data_codec.cpp @@ -22,6 +22,8 @@ #include "storage/Util.h" #include "common/Consts.h" #include "common/Json.h" +#include "test_utils/Constants.h" +#include "test_utils/DataGen.h" using namespace milvus; @@ -274,6 +276,45 @@ TEST(storage, InsertDataFloatVector) { ASSERT_EQ(data, new_data); } +TEST(storage, InsertDataSparseFloat) { + auto n_rows = 100; + auto vecs = milvus::segcore::GenerateRandomSparseFloatVector( + n_rows, kTestSparseDim, kTestSparseVectorDensity); + auto field_data = milvus::storage::CreateFieldData( + storage::DataType::VECTOR_SPARSE_FLOAT, kTestSparseDim, n_rows); + field_data->FillFieldData(vecs.get(), n_rows); + + storage::InsertData insert_data(field_data); + storage::FieldDataMeta field_data_meta{100, 101, 102, 103}; + insert_data.SetFieldDataMeta(field_data_meta); + insert_data.SetTimestamps(0, 100); + + auto serialized_bytes = insert_data.Serialize(storage::StorageType::Remote); + std::shared_ptr serialized_data_ptr(serialized_bytes.data(), + [&](uint8_t*) {}); + auto new_insert_data = storage::DeserializeFileData( + serialized_data_ptr, serialized_bytes.size()); + ASSERT_EQ(new_insert_data->GetCodecType(), storage::InsertDataType); + ASSERT_EQ(new_insert_data->GetTimeRage(), + std::make_pair(Timestamp(0), Timestamp(100))); + auto new_payload = new_insert_data->GetFieldData(); + ASSERT_TRUE(new_payload->get_data_type() == + storage::DataType::VECTOR_SPARSE_FLOAT); + ASSERT_EQ(new_payload->get_num_rows(), n_rows); + auto new_data = static_cast*>( + new_payload->Data()); + + for (auto i = 0; i < n_rows; ++i) { + auto& original = vecs[i]; + auto& new_vec = new_data[i]; + ASSERT_EQ(original.size(), new_vec.size()); + for (auto j = 0; j < original.size(); ++j) { + ASSERT_EQ(original[j].id, new_vec[j].id); + ASSERT_EQ(original[j].val, new_vec[j].val); + } + } +} + TEST(storage, InsertDataBinaryVector) { std::vector data = {1, 2, 3, 4, 5, 6, 7, 8}; int DIM = 16; diff --git a/internal/core/unittest/test_growing_index.cpp b/internal/core/unittest/test_growing_index.cpp index eb12a2793d557..b7814ef5b6b7e 100644 --- a/internal/core/unittest/test_growing_index.cpp +++ b/internal/core/unittest/test_growing_index.cpp @@ -155,11 +155,11 @@ class GrowingIndexGetVectorTest : public ::testing::TestWithParam { const char* metricType; }; -INSTANTIATE_TEST_CASE_P(IndexTypeParameters, - GrowingIndexGetVectorTest, - ::testing::Values(knowhere::metric::L2, - knowhere::metric::COSINE, - knowhere::metric::IP)); +INSTANTIATE_TEST_SUITE_P(IndexTypeParameters, + GrowingIndexGetVectorTest, + ::testing::Values(knowhere::metric::L2, + knowhere::metric::COSINE, + knowhere::metric::IP)); TEST_P(GrowingIndexGetVectorTest, GetVector) { auto schema = std::make_shared(); diff --git a/internal/core/unittest/test_index_c_api.cpp b/internal/core/unittest/test_index_c_api.cpp index f76f864d3e06a..042255028ab3d 100644 --- a/internal/core/unittest/test_index_c_api.cpp +++ b/internal/core/unittest/test_index_c_api.cpp @@ -79,6 +79,70 @@ TEST(FloatVecIndex, All) { { DeleteBinarySet(binary_set); } } +TEST(SparseFloatVecIndex, All) { + auto index_type = knowhere::IndexEnum::INDEX_SPARSE_INVERTED_INDEX; + auto metric_type = knowhere::metric::IP; + indexcgo::TypeParams type_params; + indexcgo::IndexParams index_params; + std::tie(type_params, index_params) = + generate_params(index_type, metric_type); + std::string type_params_str, index_params_str; + bool ok = google::protobuf::TextFormat::PrintToString(type_params, + &type_params_str); + assert(ok); + ok = google::protobuf::TextFormat::PrintToString(index_params, + &index_params_str); + assert(ok); + auto dataset = GenDatasetWithDataType( + NB, metric_type, milvus::DataType::VECTOR_SPARSE_FLOAT); + auto xb_data = dataset.get_col>( + milvus::FieldId(100)); + CDataType dtype = SparseFloatVector; + CIndex index; + CStatus status; + CBinarySet binary_set; + CIndex copy_index; + + { + status = CreateIndexV0( + dtype, type_params_str.c_str(), index_params_str.c_str(), &index); + ASSERT_EQ(milvus::Success, status.error_code); + } + { + status = BuildSparseFloatVecIndex( + index, + NB, + kTestSparseDim, + static_cast( + static_cast(xb_data.data()))); + ASSERT_EQ(milvus::Success, status.error_code); + } + { + status = SerializeIndexToBinarySet(index, &binary_set); + ASSERT_EQ(milvus::Success, status.error_code); + } + { + status = CreateIndexV0(dtype, + type_params_str.c_str(), + index_params_str.c_str(), + ©_index); + ASSERT_EQ(milvus::Success, status.error_code); + } + { + status = LoadIndexFromBinarySet(copy_index, binary_set); + ASSERT_EQ(milvus::Success, status.error_code); + } + { + status = DeleteIndex(index); + ASSERT_EQ(milvus::Success, status.error_code); + } + { + status = DeleteIndex(copy_index); + ASSERT_EQ(milvus::Success, status.error_code); + } + { DeleteBinarySet(binary_set); } +} + TEST(Float16VecIndex, All) { auto index_type = knowhere::IndexEnum::INDEX_FAISS_IVFPQ; auto metric_type = knowhere::metric::L2; diff --git a/internal/core/unittest/test_index_wrapper.cpp b/internal/core/unittest/test_index_wrapper.cpp index 1b5de55a2b1d4..22529f31d572a 100644 --- a/internal/core/unittest/test_index_wrapper.cpp +++ b/internal/core/unittest/test_index_wrapper.cpp @@ -59,35 +59,23 @@ class IndexWrapperTest : public ::testing::TestWithParam { search_conf = generate_search_conf(index_type, metric_type); - std::map is_binary_map = { - {knowhere::IndexEnum::INDEX_FAISS_IDMAP, false}, - {knowhere::IndexEnum::INDEX_FAISS_IVFPQ, false}, - {knowhere::IndexEnum::INDEX_FAISS_IVFFLAT, false}, - {knowhere::IndexEnum::INDEX_FAISS_IVFSQ8, false}, - {knowhere::IndexEnum::INDEX_FAISS_BIN_IVFFLAT, true}, - {knowhere::IndexEnum::INDEX_FAISS_BIN_IDMAP, true}, - {knowhere::IndexEnum::INDEX_HNSW, false}, + std::map index_to_vec_type = { + {knowhere::IndexEnum::INDEX_FAISS_IDMAP, DataType::VECTOR_FLOAT}, + {knowhere::IndexEnum::INDEX_FAISS_IVFPQ, DataType::VECTOR_FLOAT}, + {knowhere::IndexEnum::INDEX_FAISS_IVFFLAT, DataType::VECTOR_FLOAT}, + {knowhere::IndexEnum::INDEX_FAISS_IVFSQ8, DataType::VECTOR_FLOAT}, + {knowhere::IndexEnum::INDEX_FAISS_BIN_IVFFLAT, + DataType::VECTOR_BINARY}, + {knowhere::IndexEnum::INDEX_FAISS_BIN_IDMAP, + DataType::VECTOR_BINARY}, + {knowhere::IndexEnum::INDEX_HNSW, DataType::VECTOR_FLOAT}, + {knowhere::IndexEnum::INDEX_SPARSE_INVERTED_INDEX, + DataType::VECTOR_SPARSE_FLOAT}, + {knowhere::IndexEnum::INDEX_SPARSE_WAND, + DataType::VECTOR_SPARSE_FLOAT}, }; - is_binary = is_binary_map[index_type]; - if (is_binary) { - vec_field_data_type = DataType::VECTOR_BINARY; - } else { - vec_field_data_type = DataType::VECTOR_FLOAT; - } - - auto dataset = GenDataset(NB, metric_type, is_binary); - if (!is_binary) { - xb_data = dataset.get_col(milvus::FieldId(100)); - xb_dataset = knowhere::GenDataSet(NB, DIM, xb_data.data()); - xq_dataset = knowhere::GenDataSet( - NQ, DIM, xb_data.data() + DIM * query_offset); - } else { - xb_bin_data = dataset.get_col(milvus::FieldId(100)); - xb_dataset = knowhere::GenDataSet(NB, DIM, xb_bin_data.data()); - xq_dataset = knowhere::GenDataSet( - NQ, DIM, xb_bin_data.data() + DIM * query_offset); - } + vec_field_data_type = index_to_vec_type[index_type]; } void @@ -101,18 +89,13 @@ class IndexWrapperTest : public ::testing::TestWithParam { std::string type_params_str, index_params_str; Config config; milvus::Config search_conf; - bool is_binary; DataType vec_field_data_type; - knowhere::DataSetPtr xb_dataset; - FixedVector xb_data; - FixedVector xb_bin_data; - knowhere::DataSetPtr xq_dataset; - int64_t query_offset = 100; - int64_t NB = 10000; + int64_t query_offset = 1; + int64_t NB = 10; StorageConfig storage_config_; }; -INSTANTIATE_TEST_CASE_P( +INSTANTIATE_TEST_SUITE_P( IndexTypeParameters, IndexWrapperTest, ::testing::Values( @@ -126,7 +109,11 @@ INSTANTIATE_TEST_CASE_P( knowhere::metric::JACCARD), std::pair(knowhere::IndexEnum::INDEX_FAISS_BIN_IDMAP, knowhere::metric::JACCARD), - std::pair(knowhere::IndexEnum::INDEX_HNSW, knowhere::metric::L2))); + std::pair(knowhere::IndexEnum::INDEX_HNSW, knowhere::metric::L2), + std::pair(knowhere::IndexEnum::INDEX_SPARSE_INVERTED_INDEX, + knowhere::metric::IP), + std::pair(knowhere::IndexEnum::INDEX_SPARSE_WAND, + knowhere::metric::IP))); TEST_P(IndexWrapperTest, BuildAndQuery) { milvus::storage::FieldDataMeta field_data_meta{1, 2, 3, 100}; @@ -139,20 +126,29 @@ TEST_P(IndexWrapperTest, BuildAndQuery) { std::to_string(knowhere::Version::GetCurrentVersion().VersionNumber()); auto index = milvus::indexbuilder::IndexFactory::GetInstance().CreateIndex( vec_field_data_type, config, file_manager_context); - - auto dataset = GenDataset(NB, metric_type, is_binary); knowhere::DataSetPtr xb_dataset; - FixedVector bin_vecs; - FixedVector f_vecs; - if (is_binary) { - bin_vecs = dataset.get_col(milvus::FieldId(100)); + if (vec_field_data_type == DataType::VECTOR_BINARY) { + auto dataset = GenDataset(NB, metric_type, true); + auto bin_vecs = dataset.get_col(milvus::FieldId(100)); xb_dataset = knowhere::GenDataSet(NB, DIM, bin_vecs.data()); + ASSERT_NO_THROW(index->Build(xb_dataset)); + } else if (vec_field_data_type == DataType::VECTOR_SPARSE_FLOAT) { + auto dataset = GenDatasetWithDataType( + NB, metric_type, milvus::DataType::VECTOR_SPARSE_FLOAT); + auto sparse_vecs = dataset.get_col>( + milvus::FieldId(100)); + xb_dataset = + knowhere::GenDataSet(NB, kTestSparseDim, sparse_vecs.data()); + xb_dataset->SetIsSparse(true); + ASSERT_NO_THROW(index->Build(xb_dataset)); } else { - f_vecs = dataset.get_col(milvus::FieldId(100)); + // VECTOR_FLOAT + auto dataset = GenDataset(NB, metric_type, false); + auto f_vecs = dataset.get_col(milvus::FieldId(100)); xb_dataset = knowhere::GenDataSet(NB, DIM, f_vecs.data()); + ASSERT_NO_THROW(index->Build(xb_dataset)); } - ASSERT_NO_THROW(index->Build(xb_dataset)); auto binary_set = index->Serialize(); FixedVector index_files; for (auto& binary : binary_set.binary_map_) { @@ -164,21 +160,53 @@ TEST_P(IndexWrapperTest, BuildAndQuery) { vec_field_data_type, config, file_manager_context); auto vec_index = static_cast(copy_index.get()); - ASSERT_EQ(vec_index->dim(), DIM); + if (vec_field_data_type != DataType::VECTOR_SPARSE_FLOAT) { + ASSERT_EQ(vec_index->dim(), DIM); + } ASSERT_NO_THROW(vec_index->Load(binary_set)); + if (vec_field_data_type == DataType::VECTOR_SPARSE_FLOAT) { + // TODO(SPARSE): complete test in PR adding search/query to sparse + // float vector. + return; + } + milvus::SearchInfo search_info; search_info.topk_ = K; search_info.metric_type_ = metric_type; search_info.search_params_ = search_conf; - auto result = vec_index->Query(xq_dataset, search_info, nullptr); + std::unique_ptr result; + if (vec_field_data_type == DataType::VECTOR_FLOAT) { + auto dataset = GenDataset(NB, metric_type, false); + auto xb_data = dataset.get_col(milvus::FieldId(100)); + auto xb_dataset = knowhere::GenDataSet(NB, DIM, xb_data.data()); + auto xq_dataset = + knowhere::GenDataSet(NQ, DIM, xb_data.data() + DIM * query_offset); + result = vec_index->Query(xq_dataset, search_info, nullptr); + } else if (vec_field_data_type == DataType::VECTOR_SPARSE_FLOAT) { + auto dataset = GenDatasetWithDataType( + NQ, metric_type, milvus::DataType::VECTOR_SPARSE_FLOAT); + auto xb_data = dataset.get_col>( + milvus::FieldId(100)); + auto xq_dataset = + knowhere::GenDataSet(NQ, kTestSparseDim, xb_data.data()); + xq_dataset->SetIsSparse(true); + result = vec_index->Query(xq_dataset, search_info, nullptr); + } else { + auto dataset = GenDataset(NB, metric_type, true); + auto xb_bin_data = dataset.get_col(milvus::FieldId(100)); + auto xb_dataset = knowhere::GenDataSet(NB, DIM, xb_bin_data.data()); + auto xq_dataset = knowhere::GenDataSet( + NQ, DIM, xb_bin_data.data() + DIM * query_offset); + result = vec_index->Query(xq_dataset, search_info, nullptr); + } EXPECT_EQ(result->total_nq_, NQ); EXPECT_EQ(result->unity_topK_, K); EXPECT_EQ(result->distances_.size(), NQ * K); EXPECT_EQ(result->seg_offsets_.size(), NQ * K); - if (!is_binary) { + if (vec_field_data_type == DataType::VECTOR_FLOAT) { EXPECT_EQ(result->seg_offsets_[0], query_offset); } } diff --git a/internal/core/unittest/test_indexing.cpp b/internal/core/unittest/test_indexing.cpp index 3a8efc18d6899..dd1cfdf68c69e 100644 --- a/internal/core/unittest/test_indexing.cpp +++ b/internal/core/unittest/test_indexing.cpp @@ -383,7 +383,7 @@ class IndexTest : public ::testing::TestWithParam { StorageConfig storage_config_; }; -INSTANTIATE_TEST_CASE_P( +INSTANTIATE_TEST_SUITE_P( IndexTypeParameters, IndexTest, ::testing::Values( @@ -990,7 +990,7 @@ TEST(Indexing, SearchDiskAnnWithInvalidParam) { // boost::filesystem::path mmap_file_path; //}; // -//INSTANTIATE_TEST_CASE_P( +//INSTANTIATE_TEST_SUITE_P( // IndexTypeParameters, // IndexTestV2, // testing::Combine( diff --git a/internal/core/unittest/test_range_search_sort.cpp b/internal/core/unittest/test_range_search_sort.cpp index ac8208c7a9462..bc95badde075b 100644 --- a/internal/core/unittest/test_range_search_sort.cpp +++ b/internal/core/unittest/test_range_search_sort.cpp @@ -157,12 +157,12 @@ class RangeSearchSortTest float dist_min = 0.0, dist_max = 100.0; }; -INSTANTIATE_TEST_CASE_P(RangeSearchSortParameters, - RangeSearchSortTest, - ::testing::Values(knowhere::metric::L2, - knowhere::metric::IP, - knowhere::metric::JACCARD, - knowhere::metric::HAMMING)); +INSTANTIATE_TEST_SUITE_P(RangeSearchSortParameters, + RangeSearchSortTest, + ::testing::Values(knowhere::metric::L2, + knowhere::metric::IP, + knowhere::metric::JACCARD, + knowhere::metric::HAMMING)); TEST_P(RangeSearchSortTest, CheckRangeSearchSort) { auto res = milvus::ReGenRangeSearchResult(dataset, TOPK, N, metric_type); diff --git a/internal/core/unittest/test_retrieve.cpp b/internal/core/unittest/test_retrieve.cpp index 2115b6086a0e6..0139d2e7c1cc1 100644 --- a/internal/core/unittest/test_retrieve.cpp +++ b/internal/core/unittest/test_retrieve.cpp @@ -15,7 +15,6 @@ #include "knowhere/comp/index_param.h" #include "query/Expr.h" #include "query/ExprImpl.h" -#include "segcore/ScalarIndex.h" #include "test_utils/DataGen.h" #include "exec/expression/Expr.h" #include "plan/PlanNode.h" @@ -30,32 +29,6 @@ RetrieveUsingDefaultOutputSize(SegmentInterface* segment, return segment->Retrieve(plan, timestamp, DEFAULT_MAX_OUTPUT_SIZE); } -TEST(Retrieve, ScalarIndex) { - SUCCEED(); - auto index = std::make_unique(); - std::vector data; - int N = 1000; - auto req_ids = std::make_unique(); - auto req_ids_arr = req_ids->mutable_int_id(); - - for (int i = 0; i < N; ++i) { - data.push_back(i * 3 % N); - req_ids_arr->add_data(i); - } - index->append_data(data.data(), N, SegOffset(10000)); - index->build(); - - auto [res_ids, res_offsets] = index->do_search_ids(*req_ids); - auto res_ids_arr = res_ids->int_id(); - - for (int i = 0; i < N; ++i) { - auto res_offset = res_offsets[i].get() - 10000; - auto res_id = res_ids_arr.data(i); - auto std_id = (res_offset * 3 % N); - ASSERT_EQ(res_id, std_id); - } -} - TEST(Retrieve, AutoID) { auto schema = std::make_shared(); auto fid_64 = schema->AddDebugField("i64", DataType::INT64); diff --git a/internal/core/unittest/test_utils.cpp b/internal/core/unittest/test_utils.cpp index 81ff9f8d3d140..ed247d38678f8 100644 --- a/internal/core/unittest/test_utils.cpp +++ b/internal/core/unittest/test_utils.cpp @@ -9,6 +9,10 @@ // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express // or implied. See the License for the specific language governing permissions and limitations under the License +#include +#include +#include + #include #include #include @@ -19,6 +23,8 @@ #include "common/Types.h" #include "common/Utils.h" #include "common/Exception.h" +#include "knowhere/sparse_utils.h" +#include "pb/schema.pb.h" #include "query/Utils.h" #include "test_utils/DataGen.h" @@ -131,8 +137,7 @@ TEST(Util, upper_bound) { std::vector data{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; ConcurrentVector timestamps(1); - timestamps.grow_to_at_least(data.size()); - timestamps.set_data(0, data.data(), data.size()); + timestamps.set_data_raw(0, data.data(), data.size()); ASSERT_EQ(1, upper_bound(timestamps, 0, data.size(), 0)); ASSERT_EQ(5, upper_bound(timestamps, 0, data.size(), 4)); @@ -206,4 +211,4 @@ TEST(Util, get_common_prefix) { str2 = ""; common_prefix = milvus::GetCommonPrefix(str1, str2); EXPECT_STREQ(common_prefix.c_str(), ""); -} \ No newline at end of file +} diff --git a/internal/core/unittest/test_utils/Constants.h b/internal/core/unittest/test_utils/Constants.h index 190853a968f6f..dfeae7b77f89c 100644 --- a/internal/core/unittest/test_utils/Constants.h +++ b/internal/core/unittest/test_utils/Constants.h @@ -13,3 +13,6 @@ constexpr int64_t TestChunkSize = 32 * 1024; constexpr char TestLocalPath[] = "/tmp/milvus/local_data/"; constexpr char TestRemotePath[] = "/tmp/milvus/remote_data"; + +constexpr int64_t kTestSparseDim = 10000; +constexpr float kTestSparseVectorDensity = 0.0003; diff --git a/internal/core/unittest/test_utils/DataGen.h b/internal/core/unittest/test_utils/DataGen.h index 784446d5108c1..2350cca978512 100644 --- a/internal/core/unittest/test_utils/DataGen.h +++ b/internal/core/unittest/test_utils/DataGen.h @@ -16,7 +16,9 @@ #include #include #include +#include #include +#include #include "Constants.h" #include "common/EasyAssert.h" @@ -42,7 +44,7 @@ namespace milvus::segcore { struct GeneratedData { std::vector row_ids_; std::vector timestamps_; - InsertData* raw_; + InsertRecordProto* raw_; std::vector field_ids; SchemaPtr schema_; @@ -92,7 +94,8 @@ struct GeneratedData { } auto& field_meta = schema_->operator[](field_id); - if (field_meta.is_vector()) { + if (field_meta.is_vector() && + field_meta.get_data_type() != DataType::VECTOR_SPARSE_FLOAT) { if (field_meta.get_data_type() == DataType::VECTOR_FLOAT) { int len = raw_->num_rows() * field_meta.get_dim(); ret.resize(len); @@ -111,7 +114,6 @@ struct GeneratedData { std::copy_n(src_data, len, ret.data()); } else if (field_meta.get_data_type() == DataType::VECTOR_FLOAT16) { - // int len = raw_->num_rows() * field_meta.get_dim() * sizeof(float16); int len = raw_->num_rows() * field_meta.get_dim(); ret.resize(len); auto src_data = reinterpret_cast( @@ -119,7 +121,6 @@ struct GeneratedData { std::copy_n(src_data, len, ret.data()); } else if (field_meta.get_data_type() == DataType::VECTOR_BFLOAT16) { - // int len = raw_->num_rows() * field_meta.get_dim() * sizeof(bfloat16); int len = raw_->num_rows() * field_meta.get_dim(); ret.resize(len); auto src_data = reinterpret_cast( @@ -131,7 +132,13 @@ struct GeneratedData { return std::move(ret); } - if constexpr (std::is_same_v) { + if constexpr (std::is_same_v>) { + auto sparse_float_array = + target_field_data.vectors().sparse_float_vector(); + auto rows = SparseBytesToRows(sparse_float_array.contents()); + std::copy_n(rows.get(), raw_->num_rows(), ret.data()); + } else if constexpr (std::is_same_v) { auto ret_data = reinterpret_cast(ret.data()); auto src_data = target_field_data.scalars().array_data().data(); std::copy(src_data.begin(), src_data.end(), ret_data); @@ -238,19 +245,61 @@ struct GeneratedData { int array_len); }; -inline GeneratedData -DataGen(SchemaPtr schema, - int64_t N, - uint64_t seed = 42, - uint64_t ts_offset = 0, - int repeat_count = 1, - int array_len = 10) { +inline std::unique_ptr[]> +GenerateRandomSparseFloatVector(size_t rows, + size_t cols, + float density, + int seed = 42) { + int32_t num_elements = static_cast(rows * cols * density); + + std::mt19937 rng(seed); + auto real_distrib = std::uniform_real_distribution(0, 1); + auto row_distrib = std::uniform_int_distribution(0, rows - 1); + auto col_distrib = std::uniform_int_distribution(0, cols - 1); + + std::vector> data(rows); + + for (int32_t i = 0; i < num_elements; ++i) { + auto row = row_distrib(rng); + while (data[row].size() == (size_t)cols) { + row = row_distrib(rng); + } + auto col = col_distrib(rng); + while (data[row].find(col) != data[row].end()) { + col = col_distrib(rng); + } + auto val = real_distrib(rng); + data[row][col] = val; + } + + auto tensor = std::make_unique[]>(rows); + + for (int32_t i = 0; i < rows; ++i) { + if (data[i].size() == 0) { + continue; + } + knowhere::sparse::SparseRow row(data[i].size()); + size_t j = 0; + for (auto& [idx, val] : data[i]) { + row.set_at(j++, idx, val); + } + tensor[i] = std::move(row); + } + return tensor; +} + +inline GeneratedData DataGen(SchemaPtr schema, + int64_t N, + uint64_t seed = 42, + uint64_t ts_offset = 0, + int repeat_count = 1, + int array_len = 10) { using std::vector; std::default_random_engine er(seed); std::normal_distribution<> distr(0, 1); int offset = 0; - auto insert_data = std::make_unique(); + auto insert_data = std::make_unique(); auto insert_cols = [&insert_data]( auto& data, int64_t count, auto& field_meta) { auto array = milvus::segcore::CreateDataArrayFrom( @@ -309,6 +358,15 @@ DataGen(SchemaPtr schema, insert_cols(final, N, field_meta); break; } + case DataType::VECTOR_SPARSE_FLOAT: { + auto res = GenerateRandomSparseFloatVector( + N, kTestSparseDim, kTestSparseVectorDensity, seed); + auto array = milvus::segcore::CreateDataArrayFrom( + res.get(), N, field_meta); + insert_data->mutable_fields_data()->AddAllocated( + array.release()); + break; + } case DataType::VECTOR_BFLOAT16: { auto dim = field_meta.get_dim(); @@ -526,7 +584,7 @@ DataGenForJsonArray(SchemaPtr schema, std::default_random_engine er(seed); std::normal_distribution<> distr(0, 1); - auto insert_data = std::make_unique(); + auto insert_data = std::make_unique(); auto insert_cols = [&insert_data]( auto& data, int64_t count, auto& field_meta) { auto array = milvus::segcore::CreateDataArrayFrom( @@ -777,6 +835,23 @@ CreateBFloat16PlaceholderGroupFromBlob(int64_t num_queries, return raw_group; } +inline auto +CreateSparseFloatPlaceholderGroup(int64_t num_queries, int64_t seed = 42) { + namespace ser = milvus::proto::common; + ser::PlaceholderGroup raw_group; + auto value = raw_group.add_placeholders(); + + value->set_tag("$0"); + value->set_type(ser::PlaceholderType::SparseFloatVector); + auto sparse_vecs = GenerateRandomSparseFloatVector( + num_queries, kTestSparseDim, kTestSparseVectorDensity, seed); + for (int i = 0; i < num_queries; ++i) { + value->add_values(sparse_vecs[i].data(), + sparse_vecs[i].data_byte_size()); + } + return raw_group; +} + inline auto SearchResultToVector(const SearchResult& sr) { int64_t num_queries = sr.total_nq_; @@ -850,6 +925,12 @@ CreateFieldDataFromDataArray(ssize_t raw_count, createFieldData(raw_data, DataType::VECTOR_BFLOAT16, dim); break; } + case DataType::VECTOR_SPARSE_FLOAT: { + auto sparse_float_array = data->vectors().sparse_float_vector(); + auto rows = SparseBytesToRows(sparse_float_array.contents()); + createFieldData(rows.get(), DataType::VECTOR_SPARSE_FLOAT, 0); + break; + } default: { PanicInfo(Unsupported, "unsupported"); } diff --git a/internal/core/unittest/test_utils/c_api_test_utils.h b/internal/core/unittest/test_utils/c_api_test_utils.h index e57cb2615eb6d..1e9c975fe6624 100644 --- a/internal/core/unittest/test_utils/c_api_test_utils.h +++ b/internal/core/unittest/test_utils/c_api_test_utils.h @@ -37,31 +37,6 @@ using namespace milvus; using namespace milvus::segcore; namespace { -const char* -get_default_schema_config() { - static std::string conf = R"(name: "default-collection" - fields: < - fieldID: 100 - name: "fakevec" - data_type: FloatVector - type_params: < - key: "dim" - value: "16" - > - index_params: < - key: "metric_type" - value: "L2" - > - > - fields: < - fieldID: 101 - name: "age" - data_type: Int64 - is_primary_key: true - >)"; - static std::string fake_conf = ""; - return conf.c_str(); -} std::string generate_max_float_query_data(int all_nq, int max_float_nq) { diff --git a/internal/core/unittest/test_utils/indexbuilder_test_utils.h b/internal/core/unittest/test_utils/indexbuilder_test_utils.h index b3c4dda99a2bc..fc1f3b67fc141 100644 --- a/internal/core/unittest/test_utils/indexbuilder_test_utils.h +++ b/internal/core/unittest/test_utils/indexbuilder_test_utils.h @@ -98,6 +98,11 @@ generate_build_conf(const milvus::IndexType& index_type, {milvus::index::DISK_ANN_BUILD_DRAM_BUDGET, std::to_string(32)}, {milvus::index::DISK_ANN_BUILD_THREAD_NUM, std::to_string(2)}, }; + } else if (index_type == knowhere::IndexEnum::INDEX_SPARSE_INVERTED_INDEX || + index_type == knowhere::IndexEnum::INDEX_SPARSE_WAND) { + return knowhere::Json{ + {knowhere::meta::METRIC_TYPE, metric_type}, + }; } return knowhere::Json(); } @@ -235,6 +240,10 @@ GenDatasetWithDataType(int64_t N, schema->AddDebugField( "fakevec", milvus::DataType::VECTOR_FLOAT, dim, metric_type); return milvus::segcore::DataGen(schema, N); + } else if (data_type == milvus::DataType::VECTOR_SPARSE_FLOAT) { + schema->AddDebugField( + "fakevec", milvus::DataType::VECTOR_SPARSE_FLOAT, 0, metric_type); + return milvus::segcore::DataGen(schema, N); } else { schema->AddDebugField( "fakebinvec", milvus::DataType::VECTOR_BINARY, dim, metric_type);