From 12d646adc4424a1cbe988bbd35689f0ba4de4e8e Mon Sep 17 00:00:00 2001 From: wayblink Date: Tue, 9 Apr 2024 10:34:31 +0800 Subject: [PATCH] Support CDC dev Signed-off-by: wayblink --- client/client.go | 2 +- client/insert.go | 27 +++++++++++++++++++-------- go.mod | 4 +++- go.sum | 4 ++-- 4 files changed, 25 insertions(+), 12 deletions(-) diff --git a/client/client.go b/client/client.go index 9ca981b0..83272555 100644 --- a/client/client.go +++ b/client/client.go @@ -131,7 +131,7 @@ type Client interface { Flush(ctx context.Context, collName string, async bool, opts ...FlushOption) error // FlushV2 flush collection, specified, return newly sealed segmentIds, all flushed segmentIds of the collection, seal time and error // currently it is only used in milvus-backup(https://github.com/zilliztech/milvus-backup) - FlushV2(ctx context.Context, collName string, async bool, opts ...FlushOption) ([]int64, []int64, int64, error) + FlushV2(ctx context.Context, collName string, async bool, opts ...FlushOption) ([]int64, []int64, int64, map[string]msgpb.MsgPosition, error) // DeleteByPks deletes entries related to provided primary keys DeleteByPks(ctx context.Context, collName string, partitionName string, ids entity.Column) error // Delete deletes entries match expression diff --git a/client/insert.go b/client/insert.go index 2f52f0e2..04e3e92b 100644 --- a/client/insert.go +++ b/client/insert.go @@ -21,6 +21,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus-sdk-go/v2/entity" @@ -191,18 +192,18 @@ func (c *GrpcClient) mergeDynamicColumns(dynamicName string, rowSize int, column // Flush force collection to flush memory records into storage // in sync mode, flush will wait all segments to be flushed func (c *GrpcClient) Flush(ctx context.Context, collName string, async bool, opts ...FlushOption) error { - _, _, _, err := c.FlushV2(ctx, collName, async, opts...) + _, _, _, _, err := c.FlushV2(ctx, collName, async, opts...) return err } // Flush force collection to flush memory records into storage // in sync mode, flush will wait all segments to be flushed -func (c *GrpcClient) FlushV2(ctx context.Context, collName string, async bool, opts ...FlushOption) ([]int64, []int64, int64, error) { +func (c *GrpcClient) FlushV2(ctx context.Context, collName string, async bool, opts ...FlushOption) ([]int64, []int64, int64, map[string]msgpb.MsgPosition, error) { if c.Service == nil { - return nil, nil, 0, ErrClientNotReady + return nil, nil, 0, nil, ErrClientNotReady } if err := c.checkCollectionExists(ctx, collName); err != nil { - return nil, nil, 0, err + return nil, nil, 0, nil, err } req := &milvuspb.FlushRequest{ DbName: "", // reserved, @@ -213,11 +214,12 @@ func (c *GrpcClient) FlushV2(ctx context.Context, collName string, async bool, o } resp, err := c.Service.Flush(ctx, req) if err != nil { - return nil, nil, 0, err + return nil, nil, 0, nil, err } if err := handleRespStatus(resp.GetStatus()); err != nil { - return nil, nil, 0, err + return nil, nil, 0, nil, err } + channelCPs := resp.GetChannelCps() if !async { segmentIDs, has := resp.GetCollSegIDs()[collName] ids := segmentIDs.GetData() @@ -238,14 +240,23 @@ func (c *GrpcClient) FlushV2(ctx context.Context, collName string, async bool, o // respect context deadline/cancel select { case <-ctx.Done(): - return nil, nil, 0, errors.New("deadline exceeded") + return nil, nil, 0, nil, errors.New("deadline exceeded") default: } time.Sleep(200 * time.Millisecond) } } } - return resp.GetCollSegIDs()[collName].GetData(), resp.GetFlushCollSegIDs()[collName].GetData(), resp.GetCollSealTimes()[collName], nil + channelCPEntities := make(map[string]msgpb.MsgPosition, len(channelCPs)) + for k, v := range channelCPs { + channelCPEntities[k] = msgpb.MsgPosition{ + ChannelName: v.GetChannelName(), + MsgID: v.GetMsgID(), + MsgGroup: v.GetMsgGroup(), + Timestamp: v.GetTimestamp(), + } + } + return resp.GetCollSegIDs()[collName].GetData(), resp.GetFlushCollSegIDs()[collName].GetData(), resp.GetCollSealTimes()[collName], channelCPEntities, nil } // DeleteByPks deletes entries related to provided primary keys diff --git a/go.mod b/go.mod index e61dae6f..bb5c0c58 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/go-faker/faker/v4 v4.1.0 github.com/golang/protobuf v1.5.2 github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 - github.com/milvus-io/milvus-proto/go-api/v2 v2.4.0 + github.com/milvus-io/milvus-proto/go-api/v2 v2.4.2 github.com/stretchr/testify v1.8.1 github.com/tidwall/gjson v1.14.4 github.com/x448/float16 v0.8.4 @@ -36,3 +36,5 @@ require ( google.golang.org/protobuf v1.30.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) + +//replace github.com/milvus-io/milvus-proto/go-api/v2 => github.com/wayblink/milvus-proto/go-api/v2 v2.0.0-20240407121829-21199b66810a diff --git a/go.sum b/go.sum index bbc35239..6f4bf33e 100644 --- a/go.sum +++ b/go.sum @@ -157,8 +157,8 @@ github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27k github.com/mattn/goveralls v0.0.2/go.mod h1:8d1ZMHsd7fW6IRPKQh46F2WRpyib5/X4FOpevwGNQEw= github.com/mediocregopher/radix/v3 v3.4.2/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8= github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/leAFZyRl6bYmGDlGc= -github.com/milvus-io/milvus-proto/go-api/v2 v2.4.0 h1:9KsyZR+neMlRvV52C5sIsLB13jthT2AY/+buyiNjcCg= -github.com/milvus-io/milvus-proto/go-api/v2 v2.4.0/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek= +github.com/milvus-io/milvus-proto/go-api/v2 v2.4.2 h1:jgXBS8x8DTriF2pEI0RH/A+eJ8NI1f51iJcdiYEZOBg= +github.com/milvus-io/milvus-proto/go-api/v2 v2.4.2/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=