Skip to content

Commit

Permalink
Support the replicate message api
Browse files Browse the repository at this point in the history
  • Loading branch information
SimFG committed Nov 9, 2023
1 parent 415f277 commit 27c81ab
Show file tree
Hide file tree
Showing 11 changed files with 341 additions and 174 deletions.
18 changes: 13 additions & 5 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (

"google.golang.org/grpc"

"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"

"github.com/milvus-io/milvus-sdk-go/v2/entity"
)

Expand All @@ -38,9 +40,9 @@ type Client interface {
// ListDatabases list all database in milvus cluster.
ListDatabases(ctx context.Context) ([]entity.Database, error)
// CreateDatabase create database with the given name.
CreateDatabase(ctx context.Context, dbName string) error
CreateDatabase(ctx context.Context, dbName string, opts ...CreateDatabaseOption) error
// DropDatabase drop database with the given db name.
DropDatabase(ctx context.Context, dbName string) error
DropDatabase(ctx context.Context, dbName string, opts ...DropDatabaseOption) error

// -- collection --

Expand All @@ -53,13 +55,13 @@ type Client interface {
// DescribeCollection describe collection meta
DescribeCollection(ctx context.Context, collName string) (*entity.Collection, error)
// DropCollection drop the specified collection
DropCollection(ctx context.Context, collName string) error
DropCollection(ctx context.Context, collName string, opts ...DropCollectionOption) error
// GetCollectionStatistics get collection statistics
GetCollectionStatistics(ctx context.Context, collName string) (map[string]string, error)
// LoadCollection load collection into memory
LoadCollection(ctx context.Context, collName string, async bool, opts ...LoadCollectionOption) error
// ReleaseCollection release loaded collection
ReleaseCollection(ctx context.Context, collName string) error
ReleaseCollection(ctx context.Context, collName string, opts ...ReleaseCollectionOption) error
// HasCollection check whether collection exists
HasCollection(ctx context.Context, collName string) (bool, error)
// RenameCollection performs renaming for provided collection.
Expand Down Expand Up @@ -127,7 +129,7 @@ type Client interface {
Flush(ctx context.Context, collName string, async bool) error
// FlushV2 flush collection, specified, return newly sealed segmentIds, all flushed segmentIds of the collection, seal time and error
// currently it is only used in milvus-backup(https://github.com/zilliztech/milvus-backup)
FlushV2(ctx context.Context, collName string, async bool) ([]int64, []int64, int64, error)
FlushV2(ctx context.Context, collName string, async bool, opts ...FlushOption) ([]int64, []int64, int64, error)
// DeleteByPks deletes entries related to provided primary keys
DeleteByPks(ctx context.Context, collName string, partitionName string, ids entity.Column) error
// Delete deletes entries match expression
Expand Down Expand Up @@ -211,6 +213,12 @@ type Client interface {
GetVersion(ctx context.Context) (string, error)
// CheckHealth returns milvus state
CheckHealth(ctx context.Context) (*entity.MilvusState, error)

ReplicateMessage(ctx context.Context,
channelName string, beginTs, endTs uint64,
msgsBytes [][]byte, startPositions, endPositions []*msgpb.MsgPosition,
opts ...ReplicateMessageOption,
) (*entity.MessageInfo, error)
}

// NewClient create a client connected to remote milvus cluster.
Expand Down
12 changes: 10 additions & 2 deletions client/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/errors"

"github.com/golang/protobuf/proto"

"github.com/milvus-io/milvus-sdk-go/v2/entity"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
Expand Down Expand Up @@ -151,6 +152,7 @@ func (c *GrpcClient) requestCreateCollection(ctx context.Context, sch *entity.Sc
}

req := &milvuspb.CreateCollectionRequest{
Base: opt.MsgBase,
DbName: "", // reserved fields, not used for now
CollectionName: sch.CollectionName,
Schema: bs,
Expand Down Expand Up @@ -279,7 +281,7 @@ func (c *GrpcClient) DescribeCollection(ctx context.Context, collName string) (*
}

// DropCollection drop collection by name
func (c *GrpcClient) DropCollection(ctx context.Context, collName string) error {
func (c *GrpcClient) DropCollection(ctx context.Context, collName string, opts ...DropCollectionOption) error {
if c.Service == nil {
return ErrClientNotReady
}
Expand All @@ -290,6 +292,9 @@ func (c *GrpcClient) DropCollection(ctx context.Context, collName string) error
req := &milvuspb.DropCollectionRequest{
CollectionName: collName,
}
for _, opt := range opts {
opt(req)
}
resp, err := c.Service.DropCollection(ctx, req)
if err != nil {
return err
Expand Down Expand Up @@ -447,7 +452,7 @@ func (c *GrpcClient) LoadCollection(ctx context.Context, collName string, async
}

// ReleaseCollection release loaded collection
func (c *GrpcClient) ReleaseCollection(ctx context.Context, collName string) error {
func (c *GrpcClient) ReleaseCollection(ctx context.Context, collName string, opts ...ReleaseCollectionOption) error {
if c.Service == nil {
return ErrClientNotReady
}
Expand All @@ -459,6 +464,9 @@ func (c *GrpcClient) ReleaseCollection(ctx context.Context, collName string) err
DbName: "", // reserved
CollectionName: collName,
}
for _, opt := range opts {
opt(req)
}
resp, err := c.Service.ReleaseCollection(ctx, req)
if err != nil {
return err
Expand Down
10 changes: 8 additions & 2 deletions client/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (c *GrpcClient) UsingDatabase(ctx context.Context, dbName string) error {

// CreateDatabase creates a new database for remote Milvus cluster.
// TODO:New options can be added as expanding parameters.
func (c *GrpcClient) CreateDatabase(ctx context.Context, dbName string) error {
func (c *GrpcClient) CreateDatabase(ctx context.Context, dbName string, opts ...CreateDatabaseOption) error {
if c.Service == nil {
return ErrClientNotReady
}
Expand All @@ -50,6 +50,9 @@ func (c *GrpcClient) CreateDatabase(ctx context.Context, dbName string) error {
req := &milvuspb.CreateDatabaseRequest{
DbName: dbName,
}
for _, opt := range opts {
opt(req)
}
resp, err := c.Service.CreateDatabase(ctx, req)
if err != nil {
return err
Expand Down Expand Up @@ -84,7 +87,7 @@ func (c *GrpcClient) ListDatabases(ctx context.Context) ([]entity.Database, erro
}

// DropDatabase drop all database in milvus cluster.
func (c *GrpcClient) DropDatabase(ctx context.Context, dbName string) error {
func (c *GrpcClient) DropDatabase(ctx context.Context, dbName string, opts ...DropDatabaseOption) error {
if c.Service == nil {
return ErrClientNotReady
}
Expand All @@ -95,6 +98,9 @@ func (c *GrpcClient) DropDatabase(ctx context.Context, dbName string) error {
req := &milvuspb.DropDatabaseRequest{
DbName: dbName,
}
for _, opt := range opts {
opt(req)
}
resp, err := c.Service.DropDatabase(ctx, req)
if err != nil {
return err
Expand Down
9 changes: 9 additions & 0 deletions client/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type indexDef struct {
name string
fieldName string
collectionName string
MsgBase *commonpb.MsgBase
}

// IndexOption is the predefined function to alter index def.
Expand All @@ -71,6 +72,12 @@ func WithIndexName(name string) IndexOption {
}
}

func WithIndexMsgBase(msgBase *commonpb.MsgBase) IndexOption {
return func(def *indexDef) {
def.MsgBase = msgBase
}
}

func getIndexDef(opts ...IndexOption) indexDef {
idxDef := indexDef{}
for _, opt := range opts {
Expand All @@ -93,6 +100,7 @@ func (c *GrpcClient) CreateIndex(ctx context.Context, collName string, fieldName
idxDef := getIndexDef(opts...)

req := &milvuspb.CreateIndexRequest{
Base: idxDef.MsgBase,
DbName: "", // reserved
CollectionName: collName,
FieldName: fieldName,
Expand Down Expand Up @@ -167,6 +175,7 @@ func (c *GrpcClient) DropIndex(ctx context.Context, collName string, fieldName s

idxDef := getIndexDef(opts...)
req := &milvuspb.DropIndexRequest{
Base: idxDef.MsgBase,
DbName: "", //reserved,
CollectionName: collName,
FieldName: fieldName,
Expand Down
9 changes: 6 additions & 3 deletions client/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,14 +190,14 @@ func (c *GrpcClient) mergeDynamicColumns(dynamicName string, rowSize int, column

// Flush force collection to flush memory records into storage
// in sync mode, flush will wait all segments to be flushed
func (c *GrpcClient) Flush(ctx context.Context, collName string, async bool) error {
_, _, _, err := c.FlushV2(ctx, collName, async)
func (c *GrpcClient) Flush(ctx context.Context, collName string, async bool, opts ...FlushOption) error {
_, _, _, err := c.FlushV2(ctx, collName, async, opts...)
return err
}

// Flush force collection to flush memory records into storage
// in sync mode, flush will wait all segments to be flushed
func (c *GrpcClient) FlushV2(ctx context.Context, collName string, async bool) ([]int64, []int64, int64, error) {
func (c *GrpcClient) FlushV2(ctx context.Context, collName string, async bool, opts ...FlushOption) ([]int64, []int64, int64, error) {
if c.Service == nil {
return nil, nil, 0, ErrClientNotReady
}
Expand All @@ -208,6 +208,9 @@ func (c *GrpcClient) FlushV2(ctx context.Context, collName string, async bool) (
DbName: "", // reserved,
CollectionNames: []string{collName},
}
for _, opt := range opts {
opt(req)
}
resp, err := c.Service.Flush(ctx, req)
if err != nil {
return nil, nil, 0, err
Expand Down
41 changes: 41 additions & 0 deletions client/mq_message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package client

import (
"context"

"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-sdk-go/v2/entity"
)

func (c *GrpcClient) ReplicateMessage(ctx context.Context,
channelName string, beginTs, endTs uint64,
msgsBytes [][]byte, startPositions, endPositions []*msgpb.MsgPosition,
opts ...ReplicateMessageOption) (*entity.MessageInfo, error) {

if c.Service == nil {
return nil, ErrClientNotReady
}
req := &milvuspb.ReplicateMessageRequest{
ChannelName: channelName,
BeginTs: beginTs,
EndTs: endTs,
Msgs: msgsBytes,
StartPositions: startPositions,
EndPositions: endPositions,
}
for _, opt := range opts {
opt(req)
}
resp, err := c.Service.ReplicateMessage(ctx, req)
if err != nil {
return nil, err
}
err = handleRespStatus(resp.GetStatus())
if err != nil {
return nil, err
}
return &entity.MessageInfo{
Position: resp.GetPosition(),
}, nil
}
23 changes: 23 additions & 0 deletions client/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (

"github.com/cockroachdb/errors"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"

"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-sdk-go/v2/entity"
)
Expand All @@ -34,6 +36,7 @@ type createCollOpt struct {
AutoID bool
EnableDynamicSchema bool
Properties map[string]string
MsgBase *commonpb.MsgBase
}

func WithPKFieldName(name string) CreateCollectionOption {
Expand Down Expand Up @@ -262,3 +265,23 @@ func GetWithOutputFields(outputFields ...string) GetOption {
o.outputFields = outputFields
}
}

type DropCollectionOption func(*milvuspb.DropCollectionRequest)

type ReleaseCollectionOption func(*milvuspb.ReleaseCollectionRequest)

type CreateIndexOption func(*milvuspb.CreateIndexRequest)

type DropIndexOption func(*milvuspb.DropIndexRequest)

type FlushOption func(*milvuspb.FlushRequest)

type CreateDatabaseOption func(*milvuspb.CreateDatabaseRequest)

type DropDatabaseOption func(*milvuspb.DropDatabaseRequest)

type ReplicateMessageOption func(*milvuspb.ReplicateMessageRequest)

type CreatePartitionOption func(*milvuspb.CreatePartitionRequest)

type DropPartitionOption func(*milvuspb.DropPartitionRequest)
66 changes: 66 additions & 0 deletions client/options_msg_base.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package client

import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
)

func WithCreateCollectionMsgBase(msgBase *commonpb.MsgBase) CreateCollectionOption {
return func(opt *createCollOpt) {
opt.MsgBase = msgBase
}
}

func WithDropCollectionMsgBase(msgBase *commonpb.MsgBase) DropCollectionOption {
return func(req *milvuspb.DropCollectionRequest) {
req.Base = msgBase
}
}

func WithLoadCollectionMsgBase(msgBase *commonpb.MsgBase) LoadCollectionOption {
return func(req *milvuspb.LoadCollectionRequest) {
req.Base = msgBase
}
}

func WithReleaseCollectionMsgBase(msgBase *commonpb.MsgBase) ReleaseCollectionOption {
return func(req *milvuspb.ReleaseCollectionRequest) {
req.Base = msgBase
}
}

func WithFlushMsgBase(msgBase *commonpb.MsgBase) FlushOption {
return func(req *milvuspb.FlushRequest) {
req.Base = msgBase
}
}

func WithCreateDatabaseMsgBase(msgBase *commonpb.MsgBase) CreateDatabaseOption {
return func(req *milvuspb.CreateDatabaseRequest) {
req.Base = msgBase
}
}

func WithDropDatabaseMsgBase(msgBase *commonpb.MsgBase) DropDatabaseOption {
return func(req *milvuspb.DropDatabaseRequest) {
req.Base = msgBase
}
}

func WithReplicateMessageMsgBase(msgBase *commonpb.MsgBase) ReplicateMessageOption {
return func(req *milvuspb.ReplicateMessageRequest) {
req.Base = msgBase
}
}

func WithCreatePartitionMsgBase(msgBase *commonpb.MsgBase) CreatePartitionOption {
return func(req *milvuspb.CreatePartitionRequest) {
req.Base = msgBase
}
}

func WithDropPartitionMsgBase(msgBase *commonpb.MsgBase) DropPartitionOption {
return func(req *milvuspb.DropPartitionRequest) {
req.Base = msgBase
}
}
5 changes: 5 additions & 0 deletions entity/mq_message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package entity

type MessageInfo struct {
Position string
}
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,6 @@ github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27k
github.com/mattn/goveralls v0.0.2/go.mod h1:8d1ZMHsd7fW6IRPKQh46F2WRpyib5/X4FOpevwGNQEw=
github.com/mediocregopher/radix/v3 v3.4.2/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8=
github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/leAFZyRl6bYmGDlGc=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.1 h1:4qD1QJN6jDdote1UAZbEQoxKcH399oVod1GtiBYWQtQ=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.1/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.2 h1:tBcKiEUcX6i3MaFYvMJO1F7R6fIoeLFkg1kSGE1Tvpk=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.2/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
Expand Down
Loading

0 comments on commit 27c81ab

Please sign in to comment.