Skip to content

Commit

Permalink
cdc-dev
Browse files Browse the repository at this point in the history
Signed-off-by: wayblink <[email protected]>
  • Loading branch information
wayblink committed Apr 7, 2024
1 parent e553ab6 commit 1561dba
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 11 deletions.
2 changes: 1 addition & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ type Client interface {
Flush(ctx context.Context, collName string, async bool, opts ...FlushOption) error
// FlushV2 flush collection, specified, return newly sealed segmentIds, all flushed segmentIds of the collection, seal time and error
// currently it is only used in milvus-backup(https://github.com/zilliztech/milvus-backup)
FlushV2(ctx context.Context, collName string, async bool, opts ...FlushOption) ([]int64, []int64, int64, error)
FlushV2(ctx context.Context, collName string, async bool, opts ...FlushOption) ([]int64, []int64, int64, map[string]msgpb.MsgPosition, error)
// DeleteByPks deletes entries related to provided primary keys
DeleteByPks(ctx context.Context, collName string, partitionName string, ids entity.Column) error
// Delete deletes entries match expression
Expand Down
34 changes: 26 additions & 8 deletions client/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"

"github.com/milvus-io/milvus-sdk-go/v2/entity"
Expand Down Expand Up @@ -191,18 +192,18 @@ func (c *GrpcClient) mergeDynamicColumns(dynamicName string, rowSize int, column
// Flush force collection to flush memory records into storage
// in sync mode, flush will wait all segments to be flushed
func (c *GrpcClient) Flush(ctx context.Context, collName string, async bool, opts ...FlushOption) error {
_, _, _, err := c.FlushV2(ctx, collName, async, opts...)
_, _, _, _, err := c.FlushV2(ctx, collName, async, opts...)
return err
}

// Flush force collection to flush memory records into storage
// in sync mode, flush will wait all segments to be flushed
func (c *GrpcClient) FlushV2(ctx context.Context, collName string, async bool, opts ...FlushOption) ([]int64, []int64, int64, error) {
func (c *GrpcClient) FlushV2(ctx context.Context, collName string, async bool, opts ...FlushOption) ([]int64, []int64, int64, map[string]msgpb.MsgPosition, error) {
if c.Service == nil {
return nil, nil, 0, ErrClientNotReady
return nil, nil, 0, nil, ErrClientNotReady
}
if err := c.checkCollectionExists(ctx, collName); err != nil {
return nil, nil, 0, err
return nil, nil, 0, nil, err
}
req := &milvuspb.FlushRequest{
DbName: "", // reserved,
Expand All @@ -213,37 +214,54 @@ func (c *GrpcClient) FlushV2(ctx context.Context, collName string, async bool, o
}
resp, err := c.Service.Flush(ctx, req)
if err != nil {
return nil, nil, 0, err
return nil, nil, 0, nil, err
}
if err := handleRespStatus(resp.GetStatus()); err != nil {
return nil, nil, 0, err
return nil, nil, 0, nil, err
}
channelCPs := resp.GetChannelCps()
flushTs := resp.GetCollFlushTs()[collName]
if !async {
segmentIDs, has := resp.GetCollSegIDs()[collName]
ids := segmentIDs.GetData()
if has && len(ids) > 0 {
flushed := func() bool {
resp, err := c.Service.GetFlushState(ctx, &milvuspb.GetFlushStateRequest{
SegmentIDs: ids,
FlushTs: flushTs,
})
if err != nil {
// TODO max retry
return false
}
if !resp.GetFlushed() {
for k, v := range resp.GetChannelCps() {
channelCPs[k] = v
}
}
return resp.GetFlushed()
}
for !flushed() {
// respect context deadline/cancel
select {
case <-ctx.Done():
return nil, nil, 0, errors.New("deadline exceeded")
return nil, nil, 0, nil, errors.New("deadline exceeded")
default:
}
time.Sleep(200 * time.Millisecond)
}
}
}
return resp.GetCollSegIDs()[collName].GetData(), resp.GetFlushCollSegIDs()[collName].GetData(), resp.GetCollSealTimes()[collName], nil
channelCPEntities := make(map[string]msgpb.MsgPosition, len(channelCPs))
for k, v := range channelCPs {
channelCPEntities[k] = msgpb.MsgPosition{
ChannelName: v.GetChannelName(),
MsgID: v.GetMsgID(),
MsgGroup: v.GetMsgGroup(),
Timestamp: v.GetTimestamp(),
}
}
return resp.GetCollSegIDs()[collName].GetData(), resp.GetFlushCollSegIDs()[collName].GetData(), resp.GetCollSealTimes()[collName], channelCPEntities, nil
}

// DeleteByPks deletes entries related to provided primary keys
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,5 @@ require (
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace github.com/milvus-io/milvus-proto/go-api/v2 => github.com/wayblink/milvus-proto/go-api/v2 v2.0.0-20240407121829-21199b66810a
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,6 @@ github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27k
github.com/mattn/goveralls v0.0.2/go.mod h1:8d1ZMHsd7fW6IRPKQh46F2WRpyib5/X4FOpevwGNQEw=
github.com/mediocregopher/radix/v3 v3.4.2/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8=
github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/leAFZyRl6bYmGDlGc=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240109020841-d367b5a59df1 h1:oNpMivd94JAMhdSVsFw8t1b+olXz8pbzd5PES21sth8=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240109020841-d367b5a59df1/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
Expand Down Expand Up @@ -232,6 +230,8 @@ github.com/valyala/fasthttp v1.6.0/go.mod h1:FstJa9V+Pj9vQ7OJie2qMHdwemEDaDiSdBn
github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8=
github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ=
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio=
github.com/wayblink/milvus-proto/go-api/v2 v2.0.0-20240407121829-21199b66810a h1:t+ppRQ4tzIjNUjdfhxSfHKqUX0fJjADhQnuSQpLvmg8=
github.com/wayblink/milvus-proto/go-api/v2 v2.0.0-20240407121829-21199b66810a/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek=
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ=
github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y=
Expand Down

0 comments on commit 1561dba

Please sign in to comment.