Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kdxf 2.3 #1

Open
wants to merge 5 commits into
base: 2.3-dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ package client

import (
"context"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"time"

"google.golang.org/grpc"
Expand Down Expand Up @@ -127,7 +128,7 @@ type Client interface {
Flush(ctx context.Context, collName string, async bool) error
// FlushV2 flush collection, specified, return newly sealed segmentIds, all flushed segmentIds of the collection, seal time and error
// currently it is only used in milvus-backup(https://github.com/zilliztech/milvus-backup)
FlushV2(ctx context.Context, collName string, async bool) ([]int64, []int64, int64, error)
FlushV2(ctx context.Context, collName string, async bool) ([]int64, []int64, int64, map[string]milvuspb.MsgPosition, error)
// DeleteByPks deletes entries related to provided primary keys
DeleteByPks(ctx context.Context, collName string, partitionName string, ids entity.Column) error
// Delete deletes entries match expression
Expand Down
2 changes: 1 addition & 1 deletion client/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (c *GrpcClient) NewCollection(ctx context.Context, collName string, dimensi
return err
}

idx := entity.NewGenericIndex("", "", map[string]string{
idx := entity.NewGenericIndex("", "", "", map[string]string{
"metric_type": string(opt.MetricsType),
})

Expand Down
3 changes: 2 additions & 1 deletion client/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,9 @@ func (c *GrpcClient) DescribeIndex(ctx context.Context, collName string, fieldNa
params := entity.KvPairsMap(info.Params)
it := params["index_type"] // TODO change to const
idx := entity.NewGenericIndex(
info.IndexName,
info.GetIndexName(),
entity.IndexType(it),
info.GetFieldName(),
params,
)
indexes = append(indexes, idx)
Expand Down
43 changes: 35 additions & 8 deletions client/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package client

import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"time"
Expand Down Expand Up @@ -191,56 +192,82 @@ func (c *GrpcClient) mergeDynamicColumns(dynamicName string, rowSize int, column
// Flush force collection to flush memory records into storage
// in sync mode, flush will wait all segments to be flushed
func (c *GrpcClient) Flush(ctx context.Context, collName string, async bool) error {
_, _, _, err := c.FlushV2(ctx, collName, async)
_, _, _, _, err := c.FlushV2(ctx, collName, async)
return err
}

// Flush force collection to flush memory records into storage
// in sync mode, flush will wait all segments to be flushed
func (c *GrpcClient) FlushV2(ctx context.Context, collName string, async bool) ([]int64, []int64, int64, error) {
func (c *GrpcClient) FlushV2(ctx context.Context, collName string, async bool) ([]int64, []int64, int64, map[string]milvuspb.MsgPosition, error) {
if c.Service == nil {
return nil, nil, 0, ErrClientNotReady
return nil, nil, 0, nil, ErrClientNotReady
}
if err := c.checkCollectionExists(ctx, collName); err != nil {
return nil, nil, 0, err
return nil, nil, 0, nil, err
}
req := &milvuspb.FlushRequest{
DbName: "", // reserved,
CollectionNames: []string{collName},
}
resp, err := c.Service.Flush(ctx, req)
if err != nil {
return nil, nil, 0, err
return nil, nil, 0, nil, err
}
if err := handleRespStatus(resp.GetStatus()); err != nil {
return nil, nil, 0, err
return nil, nil, 0, nil, err
}
channelCPs := resp.GetChannelCps()
flushTs := resp.GetCollFlushTs()[collName]
if !async {
segmentIDs, has := resp.GetCollSegIDs()[collName]
ids := segmentIDs.GetData()
if has && len(ids) > 0 {
flushed := func() bool {
resp, err := c.Service.GetFlushState(ctx, &milvuspb.GetFlushStateRequest{
SegmentIDs: ids,
FlushTs: flushTs,
})

if err != nil {
// TODO max retry
return false
}
if !resp.GetFlushed() {
for k, v := range resp.GetChannelCps() {
channelCPs[k] = v
}
}
return resp.GetFlushed()
}
for !flushed() {
// respect context deadline/cancel
select {
case <-ctx.Done():
return nil, nil, 0, errors.New("deadline exceeded")
return nil, nil, 0, nil, errors.New("deadline exceeded")
default:
}
time.Sleep(200 * time.Millisecond)
}
}
}
return resp.GetCollSegIDs()[collName].GetData(), resp.GetFlushCollSegIDs()[collName].GetData(), resp.GetCollSealTimes()[collName], nil
channelCPEntities := make(map[string]milvuspb.MsgPosition, len(channelCPs))
for k, v := range channelCPs {
channelCPEntities[k] = milvuspb.MsgPosition{
ChannelName: v.GetChannelName(),
MsgID: v.GetMsgID(),
MsgGroup: v.GetMsgGroup(),
Timestamp: v.GetTimestamp(),
}
}
return resp.GetCollSegIDs()[collName].GetData(), resp.GetFlushCollSegIDs()[collName].GetData(), resp.GetCollSealTimes()[collName], channelCPEntities, nil
}

func Base64MsgPosition(position *milvuspb.MsgPosition) string {
positionByte, err := proto.Marshal(position)
if err != nil {
return ""
}
return base64.StdEncoding.EncodeToString(positionByte)
}

// DeleteByPks deletes entries related to provided primary keys
Expand Down
8 changes: 8 additions & 0 deletions entity/MsgPosition.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package entity

type MsgPosition struct {
ChannelName string
MsgID []byte
MsgGroup string
Timestamp uint64
}
54 changes: 35 additions & 19 deletions entity/genidx/genidx.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
//go:build ignore
// +build ignore

// Copyright (C) 2019-2021 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
Expand Down Expand Up @@ -55,6 +52,11 @@ func(i *Index{{.IdxName}}) IndexType() IndexType {
return IndexType("{{.IdxType}}")
}
// FieldName returns FieldName, implementing Index interface
func(i *Index{{.IdxName}}) FieldName() string {
return "{{.FieldName}}"
}
// SupportBinary returns whether index type support binary vector
func(i *Index{{.IdxName}}) SupportBinary() bool {
return {{.VectorSupport}} & 2 > 0
Expand Down Expand Up @@ -223,6 +225,7 @@ func TestIndex{{.IdxName}}SearchParam(t *testing.T) {
type idxDef struct {
IdxName string
IdxType entity.IndexType
FieldName string
VectorSupport int8
ConstructParams []idxParam
SearchParams []idxParam
Expand Down Expand Up @@ -329,6 +332,7 @@ func main() {
{
IdxName: "Flat",
IdxType: entity.Flat,
FieldName: "vec_field",
ConstructParams: []idxParam{},
SearchParams: []idxParam{},
ValidExamples: []string{
Expand All @@ -344,6 +348,7 @@ func main() {
{
IdxName: "BinFlat",
IdxType: entity.BinFlat,
FieldName: "vec_field",
VectorSupport: int8(binaryVectorSupport),
ConstructParams: []idxParam{
{
Expand Down Expand Up @@ -374,8 +379,9 @@ func main() {
},
// IVF_FLAT
{
IdxName: "IvfFlat",
IdxType: entity.IvfFlat,
IdxName: "IvfFlat",
IdxType: entity.IvfFlat,
FieldName: "vec_field",
ConstructParams: []idxParam{
{
Name: "nlist",
Expand Down Expand Up @@ -407,6 +413,7 @@ func main() {
{
IdxName: "BinIvfFlat",
IdxType: entity.BinIvfFlat,
FieldName: "vec_field",
VectorSupport: int8(binaryVectorSupport),
ConstructParams: []idxParam{
{
Expand Down Expand Up @@ -437,8 +444,9 @@ func main() {
},
// IVF_SQ8
{
IdxName: "IvfSQ8",
IdxType: entity.IvfSQ8,
IdxName: "IvfSQ8",
IdxType: entity.IvfSQ8,
FieldName: "vec_field",
ConstructParams: []idxParam{
{
Name: "nlist",
Expand Down Expand Up @@ -468,8 +476,9 @@ func main() {
},
// IVF_PQ
{
IdxName: "IvfPQ",
IdxType: entity.IvfPQ,
IdxName: "IvfPQ",
IdxType: entity.IvfPQ,
FieldName: "vec_field",
ConstructParams: []idxParam{
{
Name: "nlist",
Expand Down Expand Up @@ -509,8 +518,9 @@ func main() {
},
// HNSW
{
IdxName: "HNSW",
IdxType: entity.HNSW,
IdxName: "HNSW",
IdxType: entity.HNSW,
FieldName: "vec_field",
ConstructParams: []idxParam{
{
Name: "M",
Expand Down Expand Up @@ -546,8 +556,9 @@ func main() {
},
// IVF_HNSW
{
IdxName: "IvfHNSW",
IdxType: entity.IvfHNSW,
IdxName: "IvfHNSW",
IdxType: entity.IvfHNSW,
FieldName: "vec_field",
ConstructParams: []idxParam{
{
Name: "nlist",
Expand Down Expand Up @@ -596,6 +607,7 @@ func main() {
{
IdxName: "DISKANN",
IdxType: entity.DISKANN,
FieldName: "vec_field",
ConstructParams: []idxParam{},
SearchParams: []idxParam{
{
Expand All @@ -618,6 +630,7 @@ func main() {
{
IdxName: "AUTOINDEX",
IdxType: entity.AUTOINDEX,
FieldName: "vec_field",
ConstructParams: []idxParam{},
SearchParams: []idxParam{
{
Expand All @@ -639,8 +652,9 @@ func main() {
},
},
{
IdxName: "GPUIvfFlat",
IdxType: entity.GPUIvfFlat,
IdxName: "GPUIvfFlat",
IdxType: entity.GPUIvfFlat,
FieldName: "vec_field",
ConstructParams: []idxParam{
{
Name: "nlist",
Expand Down Expand Up @@ -669,8 +683,9 @@ func main() {
},
},
{
IdxName: "GPUIvfPQ",
IdxType: entity.GPUIvfPQ,
IdxName: "GPUIvfPQ",
IdxType: entity.GPUIvfPQ,
FieldName: "vec_field",
ConstructParams: []idxParam{
{
Name: "nlist",
Expand Down Expand Up @@ -709,8 +724,9 @@ func main() {
},
},
{
IdxName: "SCANN",
IdxType: entity.SCANN,
IdxName: "SCANN",
IdxType: entity.IvfFlat,
FieldName: "vec_field",
ConstructParams: []idxParam{
{
Name: "nlist",
Expand Down
18 changes: 13 additions & 5 deletions entity/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type Index interface {
Name() string
IndexType() IndexType
Params() map[string]string
FieldName() string
}

// SearchParam interface for index related search param
Expand Down Expand Up @@ -104,8 +105,9 @@ func newBaseSearchParams() baseSearchParams {
}

type baseIndex struct {
it IndexType
name string
it IndexType
name string
fieldName string
}

// Name implements Index
Expand All @@ -118,6 +120,11 @@ func (b baseIndex) IndexType() IndexType {
return b.it
}

// FieldName implements Index
func (b baseIndex) FieldName() string {
return b.fieldName
}

// GenericIndex index struct for general usage
// no constraint for index is applied
type GenericIndex struct {
Expand All @@ -138,11 +145,12 @@ func (gi GenericIndex) Params() map[string]string {
}

// NewGenericIndex create generic index instance
func NewGenericIndex(name string, it IndexType, params map[string]string) Index {
func NewGenericIndex(name string, it IndexType, fieldName string, params map[string]string) Index {
return GenericIndex{
baseIndex: baseIndex{
it: it,
name: name,
it: it,
name: name,
fieldName: fieldName,
},
params: params,
}
Expand Down
2 changes: 1 addition & 1 deletion entity/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
func TestGenericIndex(t *testing.T) {
rand.Seed(time.Now().UnixNano())
name := fmt.Sprintf("generic_index_%d", rand.Int())
gi := NewGenericIndex(name, IvfFlat, map[string]string{
gi := NewGenericIndex(name, IvfFlat, "field", map[string]string{
tMetricType: string(IP),
})
assert.Equal(t, name, gi.Name())
Expand Down
Loading
Loading