Skip to content

Commit

Permalink
[Sparse Float Vector] segcore basics and index building
Browse files Browse the repository at this point in the history
This commit adds sparse float vector support to segcore with the following:

1. data type enum declarations
2. Adds corresponding data structures for handling sparse float vectors
   in various scenarios, including:
  * FieldData as a bridge between the binlog and the in memory data structures
  * mmap::Column as the in memory representation of a sparse float vector
    column of a sealed segment;
  * ConcurrentVector as the in memory representation of a sparse float
    vector of a growing segment which supports inserts.
3. Adds logic in payload reader/writer to serialize/deserialize from/to binlog
4. Adds the ability to allow the index node to build sparse float vector index
5. Adds the ability to allow the query node to build growing index for
   growing segment and temp index for sealed segment without index built

This commit also includes some code cleanness, comment improvement, and
some unit tests for sparse vector.

Signed-off-by: Buqian Zheng <[email protected]>
  • Loading branch information
zhengbuqian committed Mar 8, 2024
1 parent cc51ab9 commit 2c23d04
Show file tree
Hide file tree
Showing 58 changed files with 1,191 additions and 610 deletions.
23 changes: 18 additions & 5 deletions internal/core/src/common/FieldData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@

namespace milvus {

template <typename Type, bool is_scalar>
template <typename Type, bool is_type_entire_row>
void
FieldDataImpl<Type, is_scalar>::FillFieldData(const void* source,
ssize_t element_count) {
FieldDataImpl<Type, is_type_entire_row>::FillFieldData(const void* source,
ssize_t element_count) {
if (element_count == 0) {
return;
}
Expand Down Expand Up @@ -57,9 +57,9 @@ GetDataInfoFromArray(const std::shared_ptr<arrow::Array> array) {
return std::make_pair(typed_array->raw_values(), element_count);
}

template <typename Type, bool is_scalar>
template <typename Type, bool is_type_entire_row>
void
FieldDataImpl<Type, is_scalar>::FillFieldData(
FieldDataImpl<Type, is_type_entire_row>::FillFieldData(
const std::shared_ptr<arrow::Array> array) {
AssertInfo(array != nullptr, "null arrow array");
auto element_count = array->length();
Expand Down Expand Up @@ -159,6 +159,18 @@ FieldDataImpl<Type, is_scalar>::FillFieldData(
array);
return FillFieldData(array_info.first, array_info.second);
}
case DataType::VECTOR_SPARSE_FLOAT: {
AssertInfo(array->type()->id() == arrow::Type::type::BINARY,
"inconsistent data type");
auto arr = std::dynamic_pointer_cast<arrow::BinaryArray>(array);
std::vector<knowhere::sparse::SparseRow<float>> values;
for (size_t index = 0; index < element_count; ++index) {
auto view = arr->GetString(index);
values.push_back(
CopyAndWrapSparseRow(view.data(), view.size()));
}
return FillFieldData(values.data(), element_count);
}
default: {
throw SegcoreError(DataTypeInvalid,
GetName() + "::FillFieldData" +
Expand Down Expand Up @@ -186,6 +198,7 @@ template class FieldDataImpl<int8_t, false>;
template class FieldDataImpl<float, false>;
template class FieldDataImpl<float16, false>;
template class FieldDataImpl<bfloat16, false>;
template class FieldDataImpl<knowhere::sparse::SparseRow<float>, true>;

FieldDataPtr
InitScalarFieldData(const DataType& type, int64_t cap_rows) {
Expand Down
8 changes: 8 additions & 0 deletions internal/core/src/common/FieldData.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,14 @@ class FieldData<BFloat16Vector> : public FieldDataImpl<bfloat16, false> {
}
};

template <>
class FieldData<SparseFloatVector> : public FieldDataSparseVectorImpl {
public:
explicit FieldData(DataType data_type, int64_t buffered_num_rows = 0)
: FieldDataSparseVectorImpl(data_type, buffered_num_rows) {
}
};

using FieldDataPtr = std::shared_ptr<FieldDataBase>;
using FieldDataChannel = Channel<FieldDataPtr>;
using FieldDataChannelPtr = std::shared_ptr<FieldDataChannel>;
Expand Down
124 changes: 115 additions & 9 deletions internal/core/src/common/FieldDataInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "common/VectorTrait.h"
#include "common/EasyAssert.h"
#include "common/Array.h"
#include "knowhere/dataset.h"

namespace milvus {

Expand All @@ -43,24 +44,33 @@ class FieldDataBase {
}
virtual ~FieldDataBase() = default;

// For all FieldDataImpl subclasses, source is a pointer to element_count of
// Type
virtual void
FillFieldData(const void* source, ssize_t element_count) = 0;

virtual void
FillFieldData(const std::shared_ptr<arrow::Array> array) = 0;

// For all FieldDataImpl subclasses, this method returns Type* that points
// at all rows in this field data.
virtual void*
Data() = 0;

// For all FieldDataImpl subclasses, this method returns a Type* that points
// at the offset-th row of this field data.
virtual const void*
RawValue(ssize_t offset) const = 0;

// Returns the serialized bytes size of all rows.
virtual int64_t
Size() const = 0;

// Returns the serialized bytes size of the index-th row.
virtual int64_t
Size(ssize_t index) const = 0;

// Number of filled rows
virtual size_t
Length() const = 0;

Expand All @@ -71,9 +81,11 @@ class FieldDataBase {
Reserve(size_t cap) = 0;

public:
// row capacity
virtual int64_t
get_num_rows() const = 0;

// each row is represented as how many Type elements
virtual int64_t
get_dim() const = 0;

Expand All @@ -86,11 +98,9 @@ class FieldDataBase {
const DataType data_type_;
};

template <typename Type, bool is_scalar = false>
template <typename Type, bool is_type_entire_row = false>
class FieldDataImpl : public FieldDataBase {
public:
// constants
using Chunk = FixedVector<Type>;
FieldDataImpl(FieldDataImpl&&) = delete;
FieldDataImpl(const FieldDataImpl&) = delete;

Expand All @@ -105,13 +115,16 @@ class FieldDataImpl : public FieldDataBase {
int64_t buffered_num_rows = 0)
: FieldDataBase(data_type),
num_rows_(buffered_num_rows),
dim_(is_scalar ? 1 : dim) {
dim_(is_type_entire_row ? 1 : dim) {
field_data_.resize(num_rows_ * dim_);
}

explicit FieldDataImpl(size_t dim, DataType type, Chunk&& field_data)
: FieldDataBase(type), dim_(is_scalar ? 1 : dim) {
explicit FieldDataImpl(size_t dim,
DataType type,
FixedVector<Type>&& field_data)
: FieldDataBase(type), dim_(is_type_entire_row ? 1 : dim) {
field_data_ = std::move(field_data);
Assert(field_data.size() % dim == 0);
num_rows_ = field_data.size() / dim;
}

Expand All @@ -122,10 +135,18 @@ class FieldDataImpl : public FieldDataBase {
FillFieldData(const std::shared_ptr<arrow::Array> array) override;

virtual void
FillFieldData(const std::shared_ptr<arrow::StringArray>& array){};
FillFieldData(const std::shared_ptr<arrow::StringArray>& array) {
PanicInfo(NotImplemented,

Check warning on line 139 in internal/core/src/common/FieldDataInterface.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/common/FieldDataInterface.h#L138-L139

Added lines #L138 - L139 were not covered by tests
"FillFieldData(const std::shared_ptr<arrow::StringArray>& "
"array) not implemented by default");
}

virtual void
FillFieldData(const std::shared_ptr<arrow::BinaryArray>& array){};
FillFieldData(const std::shared_ptr<arrow::BinaryArray>& array) {
PanicInfo(NotImplemented,

Check warning on line 146 in internal/core/src/common/FieldDataInterface.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/common/FieldDataInterface.h#L145-L146

Added lines #L145 - L146 were not covered by tests
"FillFieldData(const std::shared_ptr<arrow::BinaryArray>& "
"array) not implemented by default");
}

std::string
GetName() const {
Expand Down Expand Up @@ -209,9 +230,11 @@ class FieldDataImpl : public FieldDataBase {
}

protected:
Chunk field_data_;
FixedVector<Type> field_data_;
// number of elements field_data_ can hold
int64_t num_rows_;
mutable std::shared_mutex num_rows_mutex_;
// number of actual elements in field_data_
size_t length_{};
mutable std::shared_mutex tell_mutex_;

Expand Down Expand Up @@ -322,6 +345,89 @@ class FieldDataJsonImpl : public FieldDataImpl<Json, true> {
}
};

class FieldDataSparseVectorImpl
: public FieldDataImpl<knowhere::sparse::SparseRow<float>, true> {
public:
explicit FieldDataSparseVectorImpl(DataType data_type,
int64_t total_num_rows = 0)
: FieldDataImpl<knowhere::sparse::SparseRow<float>, true>(
/*dim=*/1, data_type, total_num_rows),
vec_dim_(0) {
AssertInfo(data_type == DataType::VECTOR_SPARSE_FLOAT,
"invalid data type for sparse vector");
}

int64_t
Size() const override {
int64_t data_size = 0;
for (size_t i = 0; i < length(); ++i) {
data_size += field_data_[i].data_byte_size();
}
return data_size;
}

int64_t
Size(ssize_t offset) const override {
AssertInfo(offset < get_num_rows(),

Check warning on line 371 in internal/core/src/common/FieldDataInterface.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/common/FieldDataInterface.h#L370-L371

Added lines #L370 - L371 were not covered by tests
"field data subscript out of range");
AssertInfo(offset < length(),

Check warning on line 373 in internal/core/src/common/FieldDataInterface.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/common/FieldDataInterface.h#L373

Added line #L373 was not covered by tests
"subscript position don't has valid value");
return field_data_[offset].data_byte_size();

Check warning on line 375 in internal/core/src/common/FieldDataInterface.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/common/FieldDataInterface.h#L375

Added line #L375 was not covered by tests
}

// source is a pointer to element_count of
// knowhere::sparse::SparseRow<float>
void
FillFieldData(const void* source, ssize_t element_count) override {
if (element_count == 0) {
return;

Check warning on line 383 in internal/core/src/common/FieldDataInterface.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/common/FieldDataInterface.h#L383

Added line #L383 was not covered by tests
}

std::lock_guard lck(tell_mutex_);
if (length_ + element_count > get_num_rows()) {
resize_field_data(length_ + element_count);

Check warning on line 388 in internal/core/src/common/FieldDataInterface.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/common/FieldDataInterface.h#L388

Added line #L388 was not covered by tests
}
auto ptr =
static_cast<const knowhere::sparse::SparseRow<float>*>(source);
for (int64_t i = 0; i < element_count; ++i) {
auto& row = ptr[i];
vec_dim_ = std::max(vec_dim_, row.dim());
}
std::copy_n(ptr, element_count, field_data_.data() + length_);
length_ += element_count;
}

// each binary in array is a knowhere::sparse::SparseRow<float>
void
FillFieldData(const std::shared_ptr<arrow::BinaryArray>& array) override {
auto n = array->length();
if (n == 0) {
return;

Check warning on line 405 in internal/core/src/common/FieldDataInterface.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/common/FieldDataInterface.h#L402-L405

Added lines #L402 - L405 were not covered by tests
}

std::lock_guard lck(tell_mutex_);
if (length_ + n > get_num_rows()) {
resize_field_data(length_ + n);

Check warning on line 410 in internal/core/src/common/FieldDataInterface.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/common/FieldDataInterface.h#L408-L410

Added lines #L408 - L410 were not covered by tests
}

for (int64_t i = 0; i < array->length(); ++i) {
auto view = array->GetView(i);
auto& row = field_data_[length_ + i];
row = CopyAndWrapSparseRow(view.data(), view.size());
vec_dim_ = std::max(vec_dim_, row.dim());

Check warning on line 417 in internal/core/src/common/FieldDataInterface.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/common/FieldDataInterface.h#L413-L417

Added lines #L413 - L417 were not covered by tests
}
length_ += n;

Check warning on line 419 in internal/core/src/common/FieldDataInterface.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/common/FieldDataInterface.h#L419

Added line #L419 was not covered by tests
}

int64_t
Dim() const {
return vec_dim_;

Check warning on line 424 in internal/core/src/common/FieldDataInterface.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/common/FieldDataInterface.h#L423-L424

Added lines #L423 - L424 were not covered by tests
}

private:
int64_t vec_dim_;
};

class FieldDataArrayImpl : public FieldDataImpl<Array, true> {
public:
explicit FieldDataArrayImpl(DataType data_type, int64_t total_num_rows = 0)
Expand Down
23 changes: 22 additions & 1 deletion internal/core/src/common/FieldMeta.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ datatype_sizeof(DataType data_type, int dim = 1) {
case DataType::VECTOR_BFLOAT16: {
return sizeof(bfloat16) * dim;
}
// Not supporting VECTOR_SPARSE_FLOAT here intentionally. We can't
// easily estimately the size of a sparse float vector. Caller of this
// method must handle this case themselves and must not pass
// VECTOR_SPARSE_FLOAT data_type.
default: {
throw SegcoreError(DataTypeInvalid,
fmt::format("invalid type is {}", data_type));
Expand Down Expand Up @@ -100,6 +104,9 @@ datatype_name(DataType data_type) {
case DataType::VECTOR_BFLOAT16: {
return "vector_bfloat16";
}
case DataType::VECTOR_SPARSE_FLOAT: {
return "vector_sparse_float";

Check warning on line 108 in internal/core/src/common/FieldMeta.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/common/FieldMeta.h#L107-L108

Added lines #L107 - L108 were not covered by tests
}
default: {
PanicInfo(DataTypeInvalid, "Unsupported DataType({})", data_type);
}
Expand All @@ -111,7 +118,13 @@ datatype_is_vector(DataType datatype) {
return datatype == DataType::VECTOR_BINARY ||
datatype == DataType::VECTOR_FLOAT ||
datatype == DataType::VECTOR_FLOAT16 ||
datatype == DataType::VECTOR_BFLOAT16;
datatype == DataType::VECTOR_BFLOAT16 ||
datatype == DataType::VECTOR_SPARSE_FLOAT;
}

inline bool
datatype_is_sparse_vector(DataType datatype) {
return datatype == DataType::VECTOR_SPARSE_FLOAT;
}

inline bool
Expand Down Expand Up @@ -153,6 +166,7 @@ datatype_is_variable(DataType datatype) {
case DataType::STRING:
case DataType::ARRAY:
case DataType::JSON:
case DataType::VECTOR_SPARSE_FLOAT:
return true;
default:
return false;
Expand Down Expand Up @@ -217,6 +231,8 @@ class FieldMeta {
Assert(datatype_is_array(type_));
}

// pass in any value for dim for sparse vector is ok as it'll never be used:
// get_dim() not allowed to be invoked on a sparse vector field.
FieldMeta(const FieldName& name,
FieldId id,
DataType type,
Expand All @@ -232,6 +248,8 @@ class FieldMeta {
int64_t
get_dim() const {
Assert(datatype_is_vector(type_));
// should not attempt to get dim() of a sparse vector from schema.
Assert(!datatype_is_sparse_vector(type_));
Assert(vector_info_.has_value());
return vector_info_->dim_;
}
Expand Down Expand Up @@ -282,6 +300,9 @@ class FieldMeta {

size_t
get_sizeof() const {
AssertInfo(!datatype_is_sparse_vector(type_),
"should not attempt to get_sizeof() of a sparse vector from "
"schema");
static const size_t ARRAY_SIZE = 128;
static const size_t JSON_SIZE = 512;
if (is_vector()) {
Expand Down
7 changes: 5 additions & 2 deletions internal/core/src/common/Schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,11 @@ Schema::ParseFrom(const milvus::proto::schema::CollectionSchema& schema_proto) {
auto type_map = RepeatedKeyValToMap(child.type_params());
auto index_map = RepeatedKeyValToMap(child.index_params());

AssertInfo(type_map.count("dim"), "dim not found");
auto dim = boost::lexical_cast<int64_t>(type_map.at("dim"));
int64_t dim = 0;
if (!datatype_is_sparse_vector(data_type)) {
AssertInfo(type_map.count("dim"), "dim not found");
dim = boost::lexical_cast<int64_t>(type_map.at("dim"));
}
if (!index_map.count("metric_type")) {
schema->AddField(name, field_id, data_type, dim, std::nullopt);
} else {
Expand Down
9 changes: 0 additions & 9 deletions internal/core/src/common/Schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,6 @@ class Schema {
return fields_.at(field_id);
}

auto
get_total_sizeof() const {
return total_sizeof_;
}

FieldId
get_field_id(const FieldName& field_name) const {
AssertInfo(name_ids_.count(field_name), "Cannot find field_name");
Expand Down Expand Up @@ -181,9 +176,6 @@ class Schema {

fields_.emplace(field_id, field_meta);
field_ids_.emplace_back(field_id);

auto field_sizeof = field_meta.get_sizeof();
total_sizeof_ += field_sizeof;
}

private:
Expand All @@ -197,7 +189,6 @@ class Schema {
std::unordered_map<FieldName, FieldId> name_ids_; // field_name -> field_id
std::unordered_map<FieldId, FieldName> id_names_; // field_id -> field_name

int64_t total_sizeof_ = 0;
std::optional<FieldId> primary_field_id_opt_;
};

Expand Down
Loading

0 comments on commit 2c23d04

Please sign in to comment.