Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: [Sparse Float Vector] segcore basics and index building #30357

Merged
merged 1 commit into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions internal/core/src/common/FieldData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@

namespace milvus {

template <typename Type, bool is_scalar>
template <typename Type, bool is_type_entire_row>
void
FieldDataImpl<Type, is_scalar>::FillFieldData(const void* source,
ssize_t element_count) {
FieldDataImpl<Type, is_type_entire_row>::FillFieldData(const void* source,
ssize_t element_count) {
if (element_count == 0) {
return;
}
Expand Down Expand Up @@ -57,9 +57,9 @@ GetDataInfoFromArray(const std::shared_ptr<arrow::Array> array) {
return std::make_pair(typed_array->raw_values(), element_count);
}

template <typename Type, bool is_scalar>
template <typename Type, bool is_type_entire_row>
void
FieldDataImpl<Type, is_scalar>::FillFieldData(
FieldDataImpl<Type, is_type_entire_row>::FillFieldData(
const std::shared_ptr<arrow::Array> array) {
AssertInfo(array != nullptr, "null arrow array");
auto element_count = array->length();
Expand Down Expand Up @@ -159,6 +159,18 @@ FieldDataImpl<Type, is_scalar>::FillFieldData(
array);
return FillFieldData(array_info.first, array_info.second);
}
case DataType::VECTOR_SPARSE_FLOAT: {
AssertInfo(array->type()->id() == arrow::Type::type::BINARY,
"inconsistent data type");
auto arr = std::dynamic_pointer_cast<arrow::BinaryArray>(array);
std::vector<knowhere::sparse::SparseRow<float>> values;
for (size_t index = 0; index < element_count; ++index) {
auto view = arr->GetString(index);
values.push_back(
CopyAndWrapSparseRow(view.data(), view.size()));
}
return FillFieldData(values.data(), element_count);
}
default: {
throw SegcoreError(DataTypeInvalid,
GetName() + "::FillFieldData" +
Expand Down Expand Up @@ -186,6 +198,7 @@ template class FieldDataImpl<int8_t, false>;
template class FieldDataImpl<float, false>;
template class FieldDataImpl<float16, false>;
template class FieldDataImpl<bfloat16, false>;
template class FieldDataImpl<knowhere::sparse::SparseRow<float>, true>;

FieldDataPtr
InitScalarFieldData(const DataType& type, int64_t cap_rows) {
Expand Down
8 changes: 8 additions & 0 deletions internal/core/src/common/FieldData.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,14 @@ class FieldData<BFloat16Vector> : public FieldDataImpl<bfloat16, false> {
}
};

template <>
class FieldData<SparseFloatVector> : public FieldDataSparseVectorImpl {
public:
explicit FieldData(DataType data_type, int64_t buffered_num_rows = 0)
: FieldDataSparseVectorImpl(data_type, buffered_num_rows) {
}
};

using FieldDataPtr = std::shared_ptr<FieldDataBase>;
using FieldDataChannel = Channel<FieldDataPtr>;
using FieldDataChannelPtr = std::shared_ptr<FieldDataChannel>;
Expand Down
124 changes: 115 additions & 9 deletions internal/core/src/common/FieldDataInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "common/VectorTrait.h"
#include "common/EasyAssert.h"
#include "common/Array.h"
#include "knowhere/dataset.h"

namespace milvus {

Expand All @@ -43,24 +44,33 @@
}
virtual ~FieldDataBase() = default;

// For all FieldDataImpl subclasses, source is a pointer to element_count of
// Type
virtual void
FillFieldData(const void* source, ssize_t element_count) = 0;

virtual void
FillFieldData(const std::shared_ptr<arrow::Array> array) = 0;

// For all FieldDataImpl subclasses, this method returns Type* that points
// at all rows in this field data.
virtual void*
Data() = 0;

// For all FieldDataImpl subclasses, this method returns a Type* that points
// at the offset-th row of this field data.
virtual const void*
RawValue(ssize_t offset) const = 0;

// Returns the serialized bytes size of all rows.
virtual int64_t
Size() const = 0;

// Returns the serialized bytes size of the index-th row.
virtual int64_t
Size(ssize_t index) const = 0;

// Number of filled rows
virtual size_t
Length() const = 0;

Expand All @@ -71,9 +81,11 @@
Reserve(size_t cap) = 0;

public:
// row capacity
virtual int64_t
get_num_rows() const = 0;

// each row is represented as how many Type elements
virtual int64_t
get_dim() const = 0;

Expand All @@ -86,11 +98,9 @@
const DataType data_type_;
};

template <typename Type, bool is_scalar = false>
template <typename Type, bool is_type_entire_row = false>
class FieldDataImpl : public FieldDataBase {
public:
// constants
using Chunk = FixedVector<Type>;
FieldDataImpl(FieldDataImpl&&) = delete;
FieldDataImpl(const FieldDataImpl&) = delete;

Expand All @@ -105,13 +115,16 @@
int64_t buffered_num_rows = 0)
: FieldDataBase(data_type),
num_rows_(buffered_num_rows),
dim_(is_scalar ? 1 : dim) {
dim_(is_type_entire_row ? 1 : dim) {
field_data_.resize(num_rows_ * dim_);
}

explicit FieldDataImpl(size_t dim, DataType type, Chunk&& field_data)
: FieldDataBase(type), dim_(is_scalar ? 1 : dim) {
explicit FieldDataImpl(size_t dim,
DataType type,
FixedVector<Type>&& field_data)
: FieldDataBase(type), dim_(is_type_entire_row ? 1 : dim) {
field_data_ = std::move(field_data);
Assert(field_data.size() % dim == 0);
num_rows_ = field_data.size() / dim;
}

Expand All @@ -122,10 +135,18 @@
FillFieldData(const std::shared_ptr<arrow::Array> array) override;

virtual void
FillFieldData(const std::shared_ptr<arrow::StringArray>& array){};
FillFieldData(const std::shared_ptr<arrow::StringArray>& array) {
PanicInfo(NotImplemented,

Check warning on line 139 in internal/core/src/common/FieldDataInterface.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/common/FieldDataInterface.h#L138-L139

Added lines #L138 - L139 were not covered by tests
"FillFieldData(const std::shared_ptr<arrow::StringArray>& "
"array) not implemented by default");
}

virtual void
FillFieldData(const std::shared_ptr<arrow::BinaryArray>& array){};
FillFieldData(const std::shared_ptr<arrow::BinaryArray>& array) {
PanicInfo(NotImplemented,

Check warning on line 146 in internal/core/src/common/FieldDataInterface.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/common/FieldDataInterface.h#L145-L146

Added lines #L145 - L146 were not covered by tests
"FillFieldData(const std::shared_ptr<arrow::BinaryArray>& "
"array) not implemented by default");
}

std::string
GetName() const {
Expand Down Expand Up @@ -209,9 +230,11 @@
}

protected:
Chunk field_data_;
FixedVector<Type> field_data_;
// number of elements field_data_ can hold
int64_t num_rows_;
mutable std::shared_mutex num_rows_mutex_;
// number of actual elements in field_data_
size_t length_{};
mutable std::shared_mutex tell_mutex_;

Expand Down Expand Up @@ -322,6 +345,89 @@
}
};

class FieldDataSparseVectorImpl
: public FieldDataImpl<knowhere::sparse::SparseRow<float>, true> {
public:
explicit FieldDataSparseVectorImpl(DataType data_type,
int64_t total_num_rows = 0)
: FieldDataImpl<knowhere::sparse::SparseRow<float>, true>(
/*dim=*/1, data_type, total_num_rows),
vec_dim_(0) {
AssertInfo(data_type == DataType::VECTOR_SPARSE_FLOAT,
"invalid data type for sparse vector");
}

int64_t
Size() const override {
int64_t data_size = 0;
for (size_t i = 0; i < length(); ++i) {
data_size += field_data_[i].data_byte_size();
}
return data_size;
}

int64_t
Size(ssize_t offset) const override {
AssertInfo(offset < get_num_rows(),

Check warning on line 371 in internal/core/src/common/FieldDataInterface.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/common/FieldDataInterface.h#L370-L371

Added lines #L370 - L371 were not covered by tests
"field data subscript out of range");
AssertInfo(offset < length(),

Check warning on line 373 in internal/core/src/common/FieldDataInterface.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/common/FieldDataInterface.h#L373

Added line #L373 was not covered by tests
"subscript position don't has valid value");
return field_data_[offset].data_byte_size();

Check warning on line 375 in internal/core/src/common/FieldDataInterface.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/common/FieldDataInterface.h#L375

Added line #L375 was not covered by tests
}

// source is a pointer to element_count of
// knowhere::sparse::SparseRow<float>
void
FillFieldData(const void* source, ssize_t element_count) override {
if (element_count == 0) {
return;

Check warning on line 383 in internal/core/src/common/FieldDataInterface.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/common/FieldDataInterface.h#L383

Added line #L383 was not covered by tests
}

std::lock_guard lck(tell_mutex_);
if (length_ + element_count > get_num_rows()) {
resize_field_data(length_ + element_count);

Check warning on line 388 in internal/core/src/common/FieldDataInterface.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/common/FieldDataInterface.h#L388

Added line #L388 was not covered by tests
}
auto ptr =
static_cast<const knowhere::sparse::SparseRow<float>*>(source);
for (int64_t i = 0; i < element_count; ++i) {
auto& row = ptr[i];
vec_dim_ = std::max(vec_dim_, row.dim());
}
std::copy_n(ptr, element_count, field_data_.data() + length_);
length_ += element_count;
}

// each binary in array is a knowhere::sparse::SparseRow<float>
void
FillFieldData(const std::shared_ptr<arrow::BinaryArray>& array) override {
auto n = array->length();
if (n == 0) {
return;

Check warning on line 405 in internal/core/src/common/FieldDataInterface.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/common/FieldDataInterface.h#L402-L405

Added lines #L402 - L405 were not covered by tests
}

std::lock_guard lck(tell_mutex_);
if (length_ + n > get_num_rows()) {
resize_field_data(length_ + n);

Check warning on line 410 in internal/core/src/common/FieldDataInterface.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/common/FieldDataInterface.h#L408-L410

Added lines #L408 - L410 were not covered by tests
}

for (int64_t i = 0; i < array->length(); ++i) {
auto view = array->GetView(i);
auto& row = field_data_[length_ + i];
row = CopyAndWrapSparseRow(view.data(), view.size());
vec_dim_ = std::max(vec_dim_, row.dim());

Check warning on line 417 in internal/core/src/common/FieldDataInterface.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/common/FieldDataInterface.h#L413-L417

Added lines #L413 - L417 were not covered by tests
}
length_ += n;

Check warning on line 419 in internal/core/src/common/FieldDataInterface.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/common/FieldDataInterface.h#L419

Added line #L419 was not covered by tests
}

int64_t
Dim() const {
return vec_dim_;

Check warning on line 424 in internal/core/src/common/FieldDataInterface.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/common/FieldDataInterface.h#L423-L424

Added lines #L423 - L424 were not covered by tests
}

private:
int64_t vec_dim_;
};

class FieldDataArrayImpl : public FieldDataImpl<Array, true> {
public:
explicit FieldDataArrayImpl(DataType data_type, int64_t total_num_rows = 0)
Expand Down
23 changes: 22 additions & 1 deletion internal/core/src/common/FieldMeta.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@
case DataType::VECTOR_BFLOAT16: {
return sizeof(bfloat16) * dim;
}
// Not supporting VECTOR_SPARSE_FLOAT here intentionally. We can't
// easily estimately the size of a sparse float vector. Caller of this
// method must handle this case themselves and must not pass
// VECTOR_SPARSE_FLOAT data_type.
default: {
throw SegcoreError(DataTypeInvalid,
fmt::format("invalid type is {}", data_type));
Expand Down Expand Up @@ -100,6 +104,9 @@
case DataType::VECTOR_BFLOAT16: {
return "vector_bfloat16";
}
case DataType::VECTOR_SPARSE_FLOAT: {
return "vector_sparse_float";

Check warning on line 108 in internal/core/src/common/FieldMeta.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/common/FieldMeta.h#L107-L108

Added lines #L107 - L108 were not covered by tests
}
default: {
PanicInfo(DataTypeInvalid, "Unsupported DataType({})", data_type);
}
Expand All @@ -111,7 +118,13 @@
return datatype == DataType::VECTOR_BINARY ||
datatype == DataType::VECTOR_FLOAT ||
datatype == DataType::VECTOR_FLOAT16 ||
datatype == DataType::VECTOR_BFLOAT16;
datatype == DataType::VECTOR_BFLOAT16 ||
datatype == DataType::VECTOR_SPARSE_FLOAT;
}

inline bool
datatype_is_sparse_vector(DataType datatype) {
return datatype == DataType::VECTOR_SPARSE_FLOAT;
}

inline bool
Expand Down Expand Up @@ -153,6 +166,7 @@
case DataType::STRING:
case DataType::ARRAY:
case DataType::JSON:
case DataType::VECTOR_SPARSE_FLOAT:
return true;
default:
return false;
Expand Down Expand Up @@ -217,6 +231,8 @@
Assert(datatype_is_array(type_));
}

// pass in any value for dim for sparse vector is ok as it'll never be used:
// get_dim() not allowed to be invoked on a sparse vector field.
FieldMeta(const FieldName& name,
FieldId id,
DataType type,
Expand All @@ -232,6 +248,8 @@
int64_t
get_dim() const {
Assert(datatype_is_vector(type_));
// should not attempt to get dim() of a sparse vector from schema.
Assert(!datatype_is_sparse_vector(type_));
Assert(vector_info_.has_value());
return vector_info_->dim_;
}
Expand Down Expand Up @@ -282,6 +300,9 @@

size_t
get_sizeof() const {
AssertInfo(!datatype_is_sparse_vector(type_),
"should not attempt to get_sizeof() of a sparse vector from "
"schema");
static const size_t ARRAY_SIZE = 128;
static const size_t JSON_SIZE = 512;
if (is_vector()) {
Expand Down
7 changes: 5 additions & 2 deletions internal/core/src/common/Schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,11 @@ Schema::ParseFrom(const milvus::proto::schema::CollectionSchema& schema_proto) {
auto type_map = RepeatedKeyValToMap(child.type_params());
auto index_map = RepeatedKeyValToMap(child.index_params());

AssertInfo(type_map.count("dim"), "dim not found");
auto dim = boost::lexical_cast<int64_t>(type_map.at("dim"));
int64_t dim = 0;
if (!datatype_is_sparse_vector(data_type)) {
AssertInfo(type_map.count("dim"), "dim not found");
dim = boost::lexical_cast<int64_t>(type_map.at("dim"));
}
if (!index_map.count("metric_type")) {
schema->AddField(name, field_id, data_type, dim, std::nullopt);
} else {
Expand Down
9 changes: 0 additions & 9 deletions internal/core/src/common/Schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,6 @@ class Schema {
return fields_.at(field_id);
}

auto
get_total_sizeof() const {
return total_sizeof_;
}

FieldId
get_field_id(const FieldName& field_name) const {
AssertInfo(name_ids_.count(field_name), "Cannot find field_name");
Expand Down Expand Up @@ -181,9 +176,6 @@ class Schema {

fields_.emplace(field_id, field_meta);
field_ids_.emplace_back(field_id);

auto field_sizeof = field_meta.get_sizeof();
total_sizeof_ += field_sizeof;
}

private:
Expand All @@ -197,7 +189,6 @@ class Schema {
std::unordered_map<FieldName, FieldId> name_ids_; // field_name -> field_id
std::unordered_map<FieldId, FieldName> id_names_; // field_id -> field_name

int64_t total_sizeof_ = 0;
std::optional<FieldId> primary_field_id_opt_;
};

Expand Down
Loading
Loading