-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: [Do Not Mrege] Add Sparse Float Vector support to milvus #29421
Changes from 1 commit
53ec559
907a83b
b4217f5
80a7531
087375d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
milvus components, including proxy, data node to receive and write sparse float vectors to binlog, query node to handle search requests, index node to build index for sparse float column, etc. Signed-off-by: Buqian Zheng <zhengbuqian@gmail.com>
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,6 +14,7 @@ import ( | |
"github.com/golang/protobuf/proto" | ||
|
||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb" | ||
"github.com/milvus-io/milvus/pkg/util/typeutil" | ||
) | ||
|
||
// PayloadReader reads data from payload | ||
|
@@ -73,6 +74,8 @@ func (r *PayloadReader) GetDataFromPayload() (interface{}, int, error) { | |
return r.GetFloat16VectorFromPayload() | ||
case schemapb.DataType_BFloat16Vector: | ||
return r.GetBFloat16VectorFromPayload() | ||
case schemapb.DataType_SparseFloatVector: | ||
return r.GetSparseFloatVectorFromPayload() | ||
case schemapb.DataType_String, schemapb.DataType_VarChar: | ||
val, err := r.GetStringFromPayload() | ||
return val, 0, err | ||
|
@@ -429,6 +432,36 @@ func (r *PayloadReader) GetFloatVectorFromPayload() ([]float32, int, error) { | |
return ret, dim, nil | ||
} | ||
|
||
func (r *PayloadReader) GetSparseFloatVectorFromPayload() (*SparseFloatVectorFieldData, int, error) { | ||
if !typeutil.IsSparseVectorType(r.colType) { | ||
return nil, -1, fmt.Errorf("failed to get sparse float vector from datatype %v", r.colType.String()) | ||
} | ||
values := make([]parquet.ByteArray, r.numRows) | ||
valuesRead, err := ReadDataFromAllRowGroups[parquet.ByteArray, *file.ByteArrayColumnChunkReader](r.reader, values, 0, r.numRows) | ||
if err != nil { | ||
return nil, -1, err | ||
} | ||
if valuesRead != r.numRows { | ||
return nil, -1, fmt.Errorf("expect %d binary, but got = %d", r.numRows, valuesRead) | ||
} | ||
|
||
fieldData := &SparseFloatVectorFieldData{} | ||
|
||
for _, value := range values { | ||
if len(value)%8 != 0 { | ||
return nil, -1, fmt.Errorf("invalid bytesData length") | ||
} | ||
|
||
fieldData.Contents = append(fieldData.Contents, value) | ||
rowDim := typeutil.SparseFloatRowDim(value) | ||
if rowDim > fieldData.Dim { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this dimension useful anywhere? or it just used as a meta information? |
||
fieldData.Dim = rowDim | ||
} | ||
} | ||
|
||
return fieldData, int(fieldData.Dim), nil | ||
} | ||
|
||
func (r *PayloadReader) GetPayloadLengthFromReader() (int, error) { | ||
return int(r.numRows), nil | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
package indexparamcheck | ||
|
||
import ( | ||
"fmt" | ||
"strconv" | ||
|
||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb" | ||
"github.com/milvus-io/milvus/pkg/common" | ||
) | ||
|
||
// sparse vector don't check for dim, but baseChecker does, thus not including baseChecker | ||
type sparseFloatVectorBaseChecker struct{} | ||
|
||
func (c sparseFloatVectorBaseChecker) StaticCheck(params map[string]string) error { | ||
if !CheckStrByValues(params, Metric, SparseMetrics) { | ||
return fmt.Errorf("metric type not found or not supported, supported: %v", SparseMetrics) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (c sparseFloatVectorBaseChecker) CheckTrain(params map[string]string) error { | ||
dropRatioBuildStr, exist := params[SparseDropRatioBuild] | ||
if exist { | ||
dropRatioBuild, err := strconv.ParseFloat(dropRatioBuildStr, 64) | ||
if err != nil || dropRatioBuild < 0 || dropRatioBuild >= 1 { | ||
return fmt.Errorf("invalid drop_ratio_build: %s, must be in range [0, 1)", dropRatioBuildStr) | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (c sparseFloatVectorBaseChecker) CheckValidDataType(dType schemapb.DataType) error { | ||
if dType != schemapb.DataType_SparseFloatVector { | ||
return fmt.Errorf("only sparse float vector is supported for the specified index tpye") | ||
} | ||
return nil | ||
} | ||
|
||
func (c sparseFloatVectorBaseChecker) SetDefaultMetricTypeIfNotExist(params map[string]string) { | ||
setDefaultIfNotExist(params, common.MetricTypeKey, SparseFloatVectorDefaultMetricType) | ||
} | ||
|
||
func newSparseFloatVectorBaseChecker() IndexChecker { | ||
return &sparseFloatVectorBaseChecker{} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
package indexparamcheck | ||
|
||
type sparseInvertedIndexChecker struct { | ||
sparseFloatVectorBaseChecker | ||
} | ||
|
||
func newSparseInvertedIndexChecker() *sparseInvertedIndexChecker { | ||
return &sparseInvertedIndexChecker{} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
// Licensed to the LF AI & Data foundation under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package testutils | ||
|
||
import ( | ||
"encoding/binary" | ||
"math" | ||
"math/rand" | ||
"sort" | ||
|
||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb" | ||
) | ||
|
||
func SparseFloatRowSetAt(row []byte, pos int, idx uint32, value float32) { | ||
binary.LittleEndian.PutUint32(row[pos*8:], idx) | ||
binary.LittleEndian.PutUint32(row[pos*8+4:], math.Float32bits(value)) | ||
} | ||
|
||
func CreateSparseFloatRow(indices []uint32, values []float32) []byte { | ||
row := make([]byte, len(indices)*8) | ||
for i := 0; i < len(indices); i++ { | ||
SparseFloatRowSetAt(row, i, indices[i], values[i]) | ||
} | ||
return row | ||
} | ||
|
||
func GenerateSparseFloatVectors(numRows int) *schemapb.SparseFloatArray { | ||
dim := 700 | ||
avgNnz := 20 | ||
var contents [][]byte | ||
maxDim := 0 | ||
|
||
uniqueAndSort := func(indices []uint32) []uint32 { | ||
seen := make(map[uint32]bool) | ||
var result []uint32 | ||
for _, value := range indices { | ||
if _, ok := seen[value]; !ok { | ||
seen[value] = true | ||
result = append(result, value) | ||
} | ||
} | ||
sort.Slice(result, func(i, j int) bool { | ||
return result[i] < result[j] | ||
}) | ||
return result | ||
} | ||
|
||
for i := 0; i < numRows; i++ { | ||
nnz := rand.Intn(avgNnz*2) + 1 | ||
indices := make([]uint32, 0, nnz) | ||
for j := 0; j < nnz; j++ { | ||
indices = append(indices, uint32(rand.Intn(dim))) | ||
} | ||
indices = uniqueAndSort(indices) | ||
values := make([]float32, 0, len(indices)) | ||
for j := 0; j < len(indices); j++ { | ||
values = append(values, rand.Float32()) | ||
} | ||
if len(indices) > 0 && int(indices[len(indices)-1])+1 > maxDim { | ||
maxDim = int(indices[len(indices)-1]) + 1 | ||
} | ||
row_bytes := CreateSparseFloatRow(indices, values) | ||
|
||
contents = append(contents, row_bytes) | ||
} | ||
return &schemapb.SparseFloatArray{ | ||
Dim: int64(maxDim), | ||
Contents: contents, | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -159,6 +159,12 @@ func estimateSizeBy(schema *schemapb.CollectionSchema, policy getVariableFieldLe | |
break | ||
} | ||
} | ||
case schemapb.DataType_SparseFloatVector: | ||
// TODO(SPARSE, zhengbuqian): size of sparse flaot vector | ||
// varies depending on the number of non-zeros. Using sparse vector | ||
// generated by SPLADE as reference and returning size of a sparse | ||
// vector with 150 non-zeros. | ||
res += 1200 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is dangerous. |
||
} | ||
} | ||
return res, nil | ||
|
@@ -235,6 +241,11 @@ func EstimateEntitySize(fieldsData []*schemapb.FieldData, rowOffset int) (int, e | |
res += int(fs.GetVectors().GetDim()) | ||
case schemapb.DataType_FloatVector: | ||
res += int(fs.GetVectors().GetDim() * 4) | ||
case schemapb.DataType_SparseFloatVector: | ||
vec := fs.GetVectors().GetSparseFloatVector() | ||
// counting only the size of the vector data, ignoring other | ||
// bytes used in proto. | ||
res += len(vec.Contents[rowOffset]) | ||
} | ||
} | ||
return res, nil | ||
|
@@ -359,13 +370,17 @@ func (helper *SchemaHelper) GetVectorDimFromID(fieldID int64) (int, error) { | |
// IsVectorType returns true if input is a vector type, otherwise false | ||
func IsVectorType(dataType schemapb.DataType) bool { | ||
switch dataType { | ||
case schemapb.DataType_FloatVector, schemapb.DataType_BinaryVector, schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector: | ||
case schemapb.DataType_FloatVector, schemapb.DataType_BinaryVector, schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector, schemapb.DataType_SparseFloatVector: | ||
return true | ||
default: | ||
return false | ||
} | ||
} | ||
|
||
func IsSparseVectorType(dataType schemapb.DataType) bool { | ||
return dataType == schemapb.DataType_SparseFloatVector | ||
} | ||
|
||
// IsIntegerType returns true if input is an integer type, otherwise false | ||
func IsIntegerType(dataType schemapb.DataType) bool { | ||
switch dataType { | ||
|
@@ -516,6 +531,15 @@ func PrepareResultFieldData(sample []*schemapb.FieldData, topK int64) []*schemap | |
vectors.Vectors.Data = &schemapb.VectorField_BinaryVector{ | ||
BinaryVector: make([]byte, 0, topK*dim/8), | ||
} | ||
case *schemapb.VectorField_SparseFloatVector: | ||
vectors.Vectors.Data = &schemapb.VectorField_SparseFloatVector{ | ||
SparseFloatVector: &schemapb.SparseFloatArray{ | ||
// dim to be updated when appending data. | ||
Dim: 0, | ||
Contents: make([][]byte, 0, topK), | ||
}, | ||
} | ||
vectors.Vectors.Dim = 0 | ||
} | ||
fd.Field = vectors | ||
} | ||
|
@@ -525,7 +549,7 @@ func PrepareResultFieldData(sample []*schemapb.FieldData, topK int64) []*schemap | |
} | ||
|
||
// AppendFieldData appends fields data of specified index from src to dst | ||
func AppendFieldData(dst []*schemapb.FieldData, src []*schemapb.FieldData, idx int64) (appendSize int64) { | ||
func AppendFieldData(dst, src []*schemapb.FieldData, idx int64) (appendSize int64) { | ||
for i, fieldData := range src { | ||
switch fieldType := fieldData.Field.(type) { | ||
case *schemapb.FieldData_Scalars: | ||
|
@@ -711,6 +735,18 @@ func AppendFieldData(dst []*schemapb.FieldData, src []*schemapb.FieldData, idx i | |
} | ||
/* #nosec G103 */ | ||
appendSize += int64(unsafe.Sizeof(srcVector.Bfloat16Vector[idx*(dim*2) : (idx+1)*(dim*2)])) | ||
case *schemapb.VectorField_SparseFloatVector: | ||
if dstVector.GetSparseFloatVector() == nil { | ||
dstVector.Data = &schemapb.VectorField_SparseFloatVector{ | ||
SparseFloatVector: &schemapb.SparseFloatArray{ | ||
Dim: 0, | ||
Contents: make([][]byte, 0), | ||
}, | ||
} | ||
dstVector.Dim = int64(srcVector.SparseFloatVector.Dim) | ||
} | ||
vec := dstVector.Data.(*schemapb.VectorField_SparseFloatVector).SparseFloatVector | ||
appendSize += appendSparseFloatArraySingleRow(vec, srcVector.SparseFloatVector, idx) | ||
default: | ||
log.Error("Not supported field type", zap.String("field type", fieldData.Type.String())) | ||
} | ||
|
@@ -767,6 +803,8 @@ func DeleteFieldData(dst []*schemapb.FieldData) { | |
case *schemapb.VectorField_Bfloat16Vector: | ||
dstBfloat16Vector := dstVector.Data.(*schemapb.VectorField_Bfloat16Vector) | ||
dstBfloat16Vector.Bfloat16Vector = dstBfloat16Vector.Bfloat16Vector[:len(dstBfloat16Vector.Bfloat16Vector)-int(dim*2)] | ||
case *schemapb.VectorField_SparseFloatVector: | ||
trimSparseFloatArray(dstVector.GetSparseFloatVector()) | ||
default: | ||
log.Error("wrong field type added", zap.String("field type", fieldData.Type.String())) | ||
} | ||
|
@@ -929,6 +967,14 @@ func MergeFieldData(dst []*schemapb.FieldData, src []*schemapb.FieldData) error | |
} else { | ||
dstVector.GetFloatVector().Data = append(dstVector.GetFloatVector().Data, srcVector.FloatVector.Data...) | ||
} | ||
case *schemapb.VectorField_SparseFloatVector: | ||
if dstVector.GetSparseFloatVector() == nil { | ||
dstVector.Data = &schemapb.VectorField_SparseFloatVector{ | ||
SparseFloatVector: srcVector.SparseFloatVector, | ||
} | ||
} else { | ||
appendSparseFloatArray(dstVector.GetSparseFloatVector(), srcVector.SparseFloatVector) | ||
} | ||
default: | ||
log.Error("Not supported data type", zap.String("data type", srcFieldData.Type.String())) | ||
return errors.New("unsupported data type: " + srcFieldData.Type.String()) | ||
|
@@ -1166,6 +1212,8 @@ func GetData(field *schemapb.FieldData, idx int) interface{} { | |
dim := int(field.GetVectors().GetDim()) | ||
dataBytes := dim * 2 | ||
return field.GetVectors().GetBfloat16Vector()[idx*dataBytes : (idx+1)*dataBytes] | ||
case schemapb.DataType_SparseFloatVector: | ||
return field.GetVectors().GetSparseFloatVector().Contents[idx] | ||
} | ||
return nil | ||
} | ||
|
@@ -1325,3 +1373,81 @@ func AppendGroupByValue(dstResData *schemapb.SearchResultData, | |
} | ||
return nil | ||
} | ||
|
||
func appendSparseFloatArray(dst, src *schemapb.SparseFloatArray) { | ||
if len(src.Contents) == 0 { | ||
return | ||
} | ||
if dst.Dim < src.Dim { | ||
dst.Dim = src.Dim | ||
} | ||
dst.Contents = append(dst.Contents, src.Contents...) | ||
} | ||
|
||
// return the size of indices and values of the appended row | ||
func appendSparseFloatArraySingleRow(dst, src *schemapb.SparseFloatArray, idx int64) int64 { | ||
row := src.Contents[idx] | ||
dst.Contents = append(dst.Contents, row) | ||
rowDim := SparseFloatRowDim(row) | ||
if rowDim == 0 { | ||
return 0 | ||
} | ||
if dst.Dim < rowDim { | ||
dst.Dim = rowDim | ||
} | ||
return int64(len(row)) | ||
} | ||
|
||
func trimSparseFloatArray(vec *schemapb.SparseFloatArray) { | ||
if len(vec.Contents) == 0 { | ||
return | ||
} | ||
// not decreasing dim of the entire SparseFloatArray, as we don't want to | ||
// iterate through the entire array to find the new max dim. Correctness | ||
// will not be affected. | ||
vec.Contents = vec.Contents[:len(vec.Contents)-1] | ||
} | ||
|
||
func ValidateSparseFloatRows(rows ...[]byte) error { | ||
for _, row := range rows { | ||
if len(row) == 0 { | ||
return errors.New("empty sparse float vector row") | ||
} | ||
if len(row)%8 != 0 { | ||
return fmt.Errorf("invalid data length in sparse float vector: %d", len(row)) | ||
} | ||
for i := 0; i < SparseFloatRowElementCount(row); i++ { | ||
if i > 0 && SparseFloatRowIndexAt(row, i) < SparseFloatRowIndexAt(row, i-1) { | ||
return errors.New("unsorted indices in sparse float vector") | ||
} | ||
VerifyFloat(float64(SparseFloatRowValueAt(row, i))) | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
// SparseFloatRowUtils | ||
func SparseFloatRowElementCount(row []byte) int { | ||
if row == nil { | ||
return 0 | ||
} | ||
return len(row) / 8 | ||
} | ||
|
||
// does not check for out-of-range access | ||
func SparseFloatRowIndexAt(row []byte, idx int) uint32 { | ||
return common.Endian.Uint32(row[idx*8:]) | ||
} | ||
|
||
// does not check for out-of-range access | ||
func SparseFloatRowValueAt(row []byte, idx int) float32 { | ||
return math.Float32frombits(common.Endian.Uint32(row[idx*8+4:])) | ||
} | ||
|
||
// dim of a sparse float vector is the maximum/last index + 1 | ||
func SparseFloatRowDim(row []byte) int64 { | ||
if row == nil || len(row) == 0 { | ||
return 0 | ||
} | ||
return int64(SparseFloatRowIndexAt(row, SparseFloatRowElementCount(row)-1)) + 1 | ||
} |
Large diffs are not rendered by default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could has nil reference?
![Uploading image.png…]()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we need to check all the sparse vector has at least nonzero value here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what other limitation do we have about sparse embeddings?