From 1561dba17a9a1a3f55b32fb207974a6224d85800 Mon Sep 17 00:00:00 2001 From: wayblink Date: Sun, 7 Apr 2024 20:25:34 +0800 Subject: [PATCH] cdc-dev Signed-off-by: wayblink --- client/client.go | 2 +- client/insert.go | 34 ++++++++++++++++++++++++++-------- go.mod | 2 ++ go.sum | 4 ++-- 4 files changed, 31 insertions(+), 11 deletions(-) diff --git a/client/client.go b/client/client.go index 8787fe874..ffde07648 100644 --- a/client/client.go +++ b/client/client.go @@ -129,7 +129,7 @@ type Client interface { Flush(ctx context.Context, collName string, async bool, opts ...FlushOption) error // FlushV2 flush collection, specified, return newly sealed segmentIds, all flushed segmentIds of the collection, seal time and error // currently it is only used in milvus-backup(https://github.com/zilliztech/milvus-backup) - FlushV2(ctx context.Context, collName string, async bool, opts ...FlushOption) ([]int64, []int64, int64, error) + FlushV2(ctx context.Context, collName string, async bool, opts ...FlushOption) ([]int64, []int64, int64, map[string]msgpb.MsgPosition, error) // DeleteByPks deletes entries related to provided primary keys DeleteByPks(ctx context.Context, collName string, partitionName string, ids entity.Column) error // Delete deletes entries match expression diff --git a/client/insert.go b/client/insert.go index fe24db404..cf5e34871 100644 --- a/client/insert.go +++ b/client/insert.go @@ -21,6 +21,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus-sdk-go/v2/entity" @@ -191,18 +192,18 @@ func (c *GrpcClient) mergeDynamicColumns(dynamicName string, rowSize int, column // Flush force collection to flush memory records into storage // in sync mode, flush will wait all segments to be flushed func (c *GrpcClient) Flush(ctx context.Context, collName string, async bool, opts ...FlushOption) error { - _, _, _, err := c.FlushV2(ctx, collName, async, opts...) + _, _, _, _, err := c.FlushV2(ctx, collName, async, opts...) return err } // Flush force collection to flush memory records into storage // in sync mode, flush will wait all segments to be flushed -func (c *GrpcClient) FlushV2(ctx context.Context, collName string, async bool, opts ...FlushOption) ([]int64, []int64, int64, error) { +func (c *GrpcClient) FlushV2(ctx context.Context, collName string, async bool, opts ...FlushOption) ([]int64, []int64, int64, map[string]msgpb.MsgPosition, error) { if c.Service == nil { - return nil, nil, 0, ErrClientNotReady + return nil, nil, 0, nil, ErrClientNotReady } if err := c.checkCollectionExists(ctx, collName); err != nil { - return nil, nil, 0, err + return nil, nil, 0, nil, err } req := &milvuspb.FlushRequest{ DbName: "", // reserved, @@ -213,11 +214,13 @@ func (c *GrpcClient) FlushV2(ctx context.Context, collName string, async bool, o } resp, err := c.Service.Flush(ctx, req) if err != nil { - return nil, nil, 0, err + return nil, nil, 0, nil, err } if err := handleRespStatus(resp.GetStatus()); err != nil { - return nil, nil, 0, err + return nil, nil, 0, nil, err } + channelCPs := resp.GetChannelCps() + flushTs := resp.GetCollFlushTs()[collName] if !async { segmentIDs, has := resp.GetCollSegIDs()[collName] ids := segmentIDs.GetData() @@ -225,25 +228,40 @@ func (c *GrpcClient) FlushV2(ctx context.Context, collName string, async bool, o flushed := func() bool { resp, err := c.Service.GetFlushState(ctx, &milvuspb.GetFlushStateRequest{ SegmentIDs: ids, + FlushTs: flushTs, }) if err != nil { // TODO max retry return false } + if !resp.GetFlushed() { + for k, v := range resp.GetChannelCps() { + channelCPs[k] = v + } + } return resp.GetFlushed() } for !flushed() { // respect context deadline/cancel select { case <-ctx.Done(): - return nil, nil, 0, errors.New("deadline exceeded") + return nil, nil, 0, nil, errors.New("deadline exceeded") default: } time.Sleep(200 * time.Millisecond) } } } - return resp.GetCollSegIDs()[collName].GetData(), resp.GetFlushCollSegIDs()[collName].GetData(), resp.GetCollSealTimes()[collName], nil + channelCPEntities := make(map[string]msgpb.MsgPosition, len(channelCPs)) + for k, v := range channelCPs { + channelCPEntities[k] = msgpb.MsgPosition{ + ChannelName: v.GetChannelName(), + MsgID: v.GetMsgID(), + MsgGroup: v.GetMsgGroup(), + Timestamp: v.GetTimestamp(), + } + } + return resp.GetCollSegIDs()[collName].GetData(), resp.GetFlushCollSegIDs()[collName].GetData(), resp.GetCollSealTimes()[collName], channelCPEntities, nil } // DeleteByPks deletes entries related to provided primary keys diff --git a/go.mod b/go.mod index b0ee74e3b..f15453d49 100644 --- a/go.mod +++ b/go.mod @@ -35,3 +35,5 @@ require ( google.golang.org/protobuf v1.30.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) + +replace github.com/milvus-io/milvus-proto/go-api/v2 => github.com/wayblink/milvus-proto/go-api/v2 v2.0.0-20240407121829-21199b66810a diff --git a/go.sum b/go.sum index a2c251a70..8c61b895b 100644 --- a/go.sum +++ b/go.sum @@ -157,8 +157,6 @@ github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27k github.com/mattn/goveralls v0.0.2/go.mod h1:8d1ZMHsd7fW6IRPKQh46F2WRpyib5/X4FOpevwGNQEw= github.com/mediocregopher/radix/v3 v3.4.2/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8= github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/leAFZyRl6bYmGDlGc= -github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240109020841-d367b5a59df1 h1:oNpMivd94JAMhdSVsFw8t1b+olXz8pbzd5PES21sth8= -github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240109020841-d367b5a59df1/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -232,6 +230,8 @@ github.com/valyala/fasthttp v1.6.0/go.mod h1:FstJa9V+Pj9vQ7OJie2qMHdwemEDaDiSdBn github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8= github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ= github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio= +github.com/wayblink/milvus-proto/go-api/v2 v2.0.0-20240407121829-21199b66810a h1:t+ppRQ4tzIjNUjdfhxSfHKqUX0fJjADhQnuSQpLvmg8= +github.com/wayblink/milvus-proto/go-api/v2 v2.0.0-20240407121829-21199b66810a/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek= github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ= github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y=