diff --git a/client/client.go b/client/client.go index 1811781c..cfc1a883 100644 --- a/client/client.go +++ b/client/client.go @@ -134,7 +134,7 @@ type Client interface { Flush(ctx context.Context, collName string, async bool, opts ...FlushOption) error // FlushV2 flush collection, specified, return newly sealed segmentIds, all flushed segmentIds of the collection, seal time and error // currently it is only used in milvus-backup(https://github.com/zilliztech/milvus-backup) - FlushV2(ctx context.Context, collName string, async bool, opts ...FlushOption) ([]int64, []int64, int64, error) + FlushV2(ctx context.Context, collName string, async bool, opts ...FlushOption) ([]int64, []int64, int64, map[string]msgpb.MsgPosition, error) // DeleteByPks deletes entries related to provided primary keys DeleteByPks(ctx context.Context, collName string, partitionName string, ids entity.Column) error // Delete deletes entries match expression diff --git a/client/insert.go b/client/insert.go index d7b91dfa..652601d7 100644 --- a/client/insert.go +++ b/client/insert.go @@ -21,6 +21,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus-sdk-go/v2/entity" @@ -185,18 +186,18 @@ func (c *GrpcClient) mergeDynamicColumns(dynamicName string, rowSize int, column // Flush force collection to flush memory records into storage // in sync mode, flush will wait all segments to be flushed func (c *GrpcClient) Flush(ctx context.Context, collName string, async bool, opts ...FlushOption) error { - _, _, _, err := c.FlushV2(ctx, collName, async, opts...) + _, _, _, _, err := c.FlushV2(ctx, collName, async, opts...) return err } // Flush force collection to flush memory records into storage // in sync mode, flush will wait all segments to be flushed -func (c *GrpcClient) FlushV2(ctx context.Context, collName string, async bool, opts ...FlushOption) ([]int64, []int64, int64, error) { +func (c *GrpcClient) FlushV2(ctx context.Context, collName string, async bool, opts ...FlushOption) ([]int64, []int64, int64, map[string]msgpb.MsgPosition, error) { if c.Service == nil { - return nil, nil, 0, ErrClientNotReady + return nil, nil, 0, nil, ErrClientNotReady } if err := c.checkCollectionExists(ctx, collName); err != nil { - return nil, nil, 0, err + return nil, nil, 0, nil, err } req := &milvuspb.FlushRequest{ DbName: "", // reserved, @@ -207,11 +208,12 @@ func (c *GrpcClient) FlushV2(ctx context.Context, collName string, async bool, o } resp, err := c.Service.Flush(ctx, req) if err != nil { - return nil, nil, 0, err + return nil, nil, 0, nil, err } if err := handleRespStatus(resp.GetStatus()); err != nil { - return nil, nil, 0, err + return nil, nil, 0, nil, err } + channelCPs := resp.GetChannelCps() if !async { segmentIDs, has := resp.GetCollSegIDs()[collName] ids := segmentIDs.GetData() @@ -232,14 +234,23 @@ func (c *GrpcClient) FlushV2(ctx context.Context, collName string, async bool, o // respect context deadline/cancel select { case <-ctx.Done(): - return nil, nil, 0, errors.New("deadline exceeded") + return nil, nil, 0, nil, errors.New("deadline exceeded") default: } time.Sleep(200 * time.Millisecond) } } } - return resp.GetCollSegIDs()[collName].GetData(), resp.GetFlushCollSegIDs()[collName].GetData(), resp.GetCollSealTimes()[collName], nil + channelCPEntities := make(map[string]msgpb.MsgPosition, len(channelCPs)) + for k, v := range channelCPs { + channelCPEntities[k] = msgpb.MsgPosition{ + ChannelName: v.GetChannelName(), + MsgID: v.GetMsgID(), + MsgGroup: v.GetMsgGroup(), + Timestamp: v.GetTimestamp(), + } + } + return resp.GetCollSegIDs()[collName].GetData(), resp.GetFlushCollSegIDs()[collName].GetData(), resp.GetCollSealTimes()[collName], channelCPEntities, nil } // DeleteByPks deletes entries related to provided primary keys