diff --git a/client/client.go b/client/client.go index cfc1a883..ec3765d5 100644 --- a/client/client.go +++ b/client/client.go @@ -214,7 +214,9 @@ type Client interface { // ListResourceGroups returns list of resource group names in current Milvus instance. ListResourceGroups(ctx context.Context) ([]string, error) // CreateResourceGroup creates a resource group with provided name. - CreateResourceGroup(ctx context.Context, rgName string) error + CreateResourceGroup(ctx context.Context, rgName string, opts ...CreateResourceGroupOption) error + // UpdateResourceGroups updates resource groups with provided options. + UpdateResourceGroups(ctx context.Context, opts ...UpdateResourceGroupsOption) error // DescribeResourceGroup returns resource groups information. DescribeResourceGroup(ctx context.Context, rgName string) (*entity.ResourceGroup, error) // DropResourceGroup drops the resource group with provided name. diff --git a/client/options.go b/client/options.go index 6ac2a962..0254dd7b 100644 --- a/client/options.go +++ b/client/options.go @@ -322,3 +322,26 @@ type DropPartitionOption func(*milvuspb.DropPartitionRequest) type LoadPartitionsOption func(*milvuspb.LoadPartitionsRequest) type ReleasePartitionsOption func(*milvuspb.ReleasePartitionsRequest) + +// CreateResourceGroupOption is an option that is used in CreateResourceGroup API. +type CreateResourceGroupOption func(*milvuspb.CreateResourceGroupRequest) + +// WithCreateResourceGroupConfig returns a CreateResourceGroupOption that setup the config. +func WithCreateResourceGroupConfig(config *entity.ResourceGroupConfig) CreateResourceGroupOption { + return func(req *milvuspb.CreateResourceGroupRequest) { + req.Config = config + } +} + +// UpdateResourceGroupsOption is an option that is used in UpdateResourceGroups API. +type UpdateResourceGroupsOption func(*milvuspb.UpdateResourceGroupsRequest) + +// WithUpdateResourceGroupConfig returns an UpdateResourceGroupsOption that sets the new config to the specified resource group. +func WithUpdateResourceGroupConfig(resourceGroupName string, config *entity.ResourceGroupConfig) UpdateResourceGroupsOption { + return func(urgr *milvuspb.UpdateResourceGroupsRequest) { + if urgr.ResourceGroups == nil { + urgr.ResourceGroups = make(map[string]*entity.ResourceGroupConfig) + } + urgr.ResourceGroups[resourceGroupName] = config + } +} diff --git a/client/options_test.go b/client/options_test.go index bc9194cb..23f1dc7b 100644 --- a/client/options_test.go +++ b/client/options_test.go @@ -177,3 +177,27 @@ func TestMakeSearchQueryOption(t *testing.T) { assert.Error(t, err) }) } + +func TestWithUpdateResourceGroupConfig(t *testing.T) { + req := &milvuspb.UpdateResourceGroupsRequest{} + + WithUpdateResourceGroupConfig("rg1", &entity.ResourceGroupConfig{ + Requests: &entity.ResourceGroupLimit{NodeNum: 1}, + })(req) + WithUpdateResourceGroupConfig("rg2", &entity.ResourceGroupConfig{ + Requests: &entity.ResourceGroupLimit{NodeNum: 2}, + })(req) + + assert.Equal(t, 2, len(req.ResourceGroups)) + assert.Equal(t, int32(1), req.ResourceGroups["rg1"].Requests.NodeNum) + assert.Equal(t, int32(2), req.ResourceGroups["rg2"].Requests.NodeNum) +} + +func TestWithCreateResourceGroup(t *testing.T) { + req := &milvuspb.CreateResourceGroupRequest{} + + WithCreateResourceGroupConfig(&entity.ResourceGroupConfig{ + Requests: &entity.ResourceGroupLimit{NodeNum: 1}, + })(req) + assert.Equal(t, int32(1), req.Config.Requests.NodeNum) +} diff --git a/client/resource_group.go b/client/resource_group.go index fc3f90db..6cfd6552 100644 --- a/client/resource_group.go +++ b/client/resource_group.go @@ -38,7 +38,7 @@ func (c *GrpcClient) ListResourceGroups(ctx context.Context) ([]string, error) { } // CreateResourceGroup creates a resource group with provided name. -func (c *GrpcClient) CreateResourceGroup(ctx context.Context, rgName string) error { +func (c *GrpcClient) CreateResourceGroup(ctx context.Context, rgName string, opts ...CreateResourceGroupOption) error { if c.Service == nil { return ErrClientNotReady } @@ -46,6 +46,9 @@ func (c *GrpcClient) CreateResourceGroup(ctx context.Context, rgName string) err req := &milvuspb.CreateResourceGroupRequest{ ResourceGroup: rgName, } + for _, opt := range opts { + opt(req) + } resp, err := c.Service.CreateResourceGroup(ctx, req) if err != nil { @@ -54,6 +57,24 @@ func (c *GrpcClient) CreateResourceGroup(ctx context.Context, rgName string) err return handleRespStatus(resp) } +// UpdateResourceGroups updates resource groups with provided options. +func (c *GrpcClient) UpdateResourceGroups(ctx context.Context, opts ...UpdateResourceGroupsOption) error { + if c.Service == nil { + return ErrClientNotReady + } + + req := &milvuspb.UpdateResourceGroupsRequest{} + for _, opt := range opts { + opt(req) + } + + resp, err := c.Service.UpdateResourceGroups(ctx, req) + if err != nil { + return err + } + return handleRespStatus(resp) +} + // DescribeResourceGroup returns resource groups information. func (c *GrpcClient) DescribeResourceGroup(ctx context.Context, rgName string) (*entity.ResourceGroup, error) { if c.Service == nil { @@ -80,6 +101,8 @@ func (c *GrpcClient) DescribeResourceGroup(ctx context.Context, rgName string) ( LoadedReplica: rg.GetNumLoadedReplica(), OutgoingNodeNum: rg.GetNumOutgoingNode(), IncomingNodeNum: rg.GetNumIncomingNode(), + Config: rg.GetConfig(), + Nodes: rg.GetNodes(), } return result, nil diff --git a/client/resource_group_test.go b/client/resource_group_test.go index 7b4e9096..ce9096f9 100644 --- a/client/resource_group_test.go +++ b/client/resource_group_test.go @@ -23,6 +23,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus-sdk-go/v2/entity" ) type ResourceGroupSuite struct { @@ -121,6 +122,68 @@ func (s *ResourceGroupSuite) TestCreateResourceGroup() { }) } +func (s *ResourceGroupSuite) TestUpdateResourceGroups() { + c := s.client + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + s.Run("normal_run", func() { + defer s.resetMock() + rgName := randStr(10) + + s.mock.EXPECT().UpdateResourceGroups(mock.Anything, mock.AnythingOfType("*milvuspb.UpdateResourceGroupsRequest")). + Run(func(_ context.Context, req *milvuspb.UpdateResourceGroupsRequest) { + s.Len(req.ResourceGroups, 1) + s.NotNil(req.ResourceGroups[rgName]) + s.Equal(int32(1), req.ResourceGroups[rgName].Requests.NodeNum) + }). + Return(&commonpb.Status{}, nil) + + err := c.UpdateResourceGroups(ctx, WithUpdateResourceGroupConfig(rgName, &entity.ResourceGroupConfig{ + Requests: &entity.ResourceGroupLimit{NodeNum: 1}, + })) + s.NoError(err) + }) + + s.Run("request_fails", func() { + defer s.resetMock() + + rgName := randStr(10) + + s.mock.EXPECT().UpdateResourceGroups(mock.Anything, mock.AnythingOfType("*milvuspb.UpdateResourceGroupsRequest")). + Run(func(_ context.Context, req *milvuspb.UpdateResourceGroupsRequest) { + s.Len(req.ResourceGroups, 1) + s.NotNil(req.ResourceGroups[rgName]) + s.Equal(int32(1), req.ResourceGroups[rgName].Requests.NodeNum) + }). + Return(nil, errors.New("mocked grpc error")) + + err := c.UpdateResourceGroups(ctx, WithUpdateResourceGroupConfig(rgName, &entity.ResourceGroupConfig{ + Requests: &entity.ResourceGroupLimit{NodeNum: 1}, + })) + s.Error(err) + }) + + s.Run("server_return_err", func() { + defer s.resetMock() + + rgName := randStr(10) + + s.mock.EXPECT().UpdateResourceGroups(mock.Anything, mock.AnythingOfType("*milvuspb.UpdateResourceGroupsRequest")). + Run(func(_ context.Context, req *milvuspb.UpdateResourceGroupsRequest) { + s.Len(req.ResourceGroups, 1) + s.NotNil(req.ResourceGroups[rgName]) + s.Equal(int32(1), req.ResourceGroups[rgName].Requests.NodeNum) + }). + Return(&commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError}, nil) + + err := c.UpdateResourceGroups(ctx, WithUpdateResourceGroupConfig(rgName, &entity.ResourceGroupConfig{ + Requests: &entity.ResourceGroupLimit{NodeNum: 1}, + })) + s.Error(err) + }) +} + func (s *ResourceGroupSuite) TestDescribeResourceGroup() { c := s.client ctx, cancel := context.WithCancel(context.Background()) @@ -153,7 +216,6 @@ func (s *ResourceGroupSuite) TestDescribeResourceGroup() { s.Equal(rgName, req.GetResourceGroup()) }). Call.Return(func(_ context.Context, req *milvuspb.DescribeResourceGroupRequest) *milvuspb.DescribeResourceGroupResponse { - return &milvuspb.DescribeResourceGroupResponse{ Status: &commonpb.Status{}, ResourceGroup: &milvuspb.ResourceGroup{ diff --git a/entity/resource_group.go b/entity/resource_group.go index 77eb070f..873edb89 100644 --- a/entity/resource_group.go +++ b/entity/resource_group.go @@ -1,5 +1,17 @@ package entity +import ( + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/rgpb" +) + +type ( + ResourceGroupConfig = rgpb.ResourceGroupConfig + ResourceGroupLimit = rgpb.ResourceGroupLimit + ResourceGroupTransfer = rgpb.ResourceGroupTransfer + NodeInfo = commonpb.NodeInfo +) + // ResourceGroup information model struct. type ResourceGroup struct { Name string @@ -8,4 +20,6 @@ type ResourceGroup struct { LoadedReplica map[string]int32 OutgoingNodeNum map[string]int32 IncomingNodeNum map[string]int32 + Config *ResourceGroupConfig + Nodes []*NodeInfo } diff --git a/examples/resourcegroup/resourcegroup.go b/examples/resourcegroup/resourcegroup.go new file mode 100644 index 00000000..169bcff9 --- /dev/null +++ b/examples/resourcegroup/resourcegroup.go @@ -0,0 +1,180 @@ +package main + +import ( + "context" + "encoding/json" + "log" + "time" + + "github.com/milvus-io/milvus-sdk-go/v2/client" + "github.com/milvus-io/milvus-sdk-go/v2/entity" +) + +const ( + milvusAddr = `localhost:19530` + recycleResourceGroup = `__recycle_resource_group` + defaultResourceGroup = `__default_resource_group` + rg1 = `rg1` + rg2 = `rg2` +) + +func main() { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + c, err := client.NewClient(ctx, client.Config{ + Address: milvusAddr, + }) + if err != nil { + log.Fatal("failed to connect to milvus, err: ", err.Error()) + } + defer c.Close() + + ctx = context.Background() + showAllResourceGroup(ctx, c) + + // query node count: 1 + // | RG | Request | Limit | Nodes | + // | -- | ------- | ----- | ----- | + // | __default__resource_group | 1 | 1 | 1 | + // | __recycle__resource_group | 0 | 10000 | 0 | + // | rg1 | 0 | 0 | 0 | + // | rg2 | 0 | 0 | 0 | + if err := initializeCluster(ctx, c); err != nil { + log.Fatal("failed to initialize cluster, err: ", err.Error()) + } + + showAllResourceGroup(ctx, c) + + // do some resource group managements. + if err := resourceGroupManagement(ctx, c); err != nil { + log.Fatal("failed to manage resource group, err: ", err.Error()) + } +} + +// initializeCluster initializes the cluster with 4 resource groups. +func initializeCluster(ctx context.Context, c client.Client) error { + // Use a huge resource group to hold the redundant query node. + if err := c.CreateResourceGroup(ctx, recycleResourceGroup, client.WithCreateResourceGroupConfig( + &entity.ResourceGroupConfig{ + Requests: &entity.ResourceGroupLimit{NodeNum: 0}, + Limits: &entity.ResourceGroupLimit{NodeNum: 10000}, + }, + )); err != nil { + return err + } + + if err := c.UpdateResourceGroups(ctx, client.WithUpdateResourceGroupConfig(defaultResourceGroup, newResourceGroupCfg(1, 1))); err != nil { + return err + } + + if err := c.CreateResourceGroup(ctx, rg1); err != nil { + return err + } + + return c.CreateResourceGroup(ctx, rg2) +} + +// resourceGroupManagement manages the resource groups. +func resourceGroupManagement(ctx context.Context, c client.Client) error { + // Update resource group config. + // | RG | Request | Limit | Nodes | + // | -- | ------- | ----- | ----- | + // | __default__resource_group | 1 | 1 | 1 | + // | __recycle__resource_group | 0 | 10000 | 0 | + // | rg1 | 1 | 1 | 0 | + // | rg2 | 2 | 2 | 0 | + if err := c.UpdateResourceGroups(ctx, + client.WithUpdateResourceGroupConfig(rg1, newResourceGroupCfg(1, 1)), + client.WithUpdateResourceGroupConfig(rg2, newResourceGroupCfg(2, 2)), + ); err != nil { + return err + } + showAllResourceGroup(ctx, c) + + // scale out cluster, new query node will be added to rg1 and rg2. + // | RG | Request | Limit | Nodes | + // | -- | ------- | ----- | ----- | + // | __default__resource_group | 1 | 1 | 1 | + // | __recycle__resource_group | 0 | 10000 | 0 | + // | rg1 | 1 | 1 | 1 | + // | rg2 | 2 | 2 | 2 | + scaleTo(ctx, 4) + showAllResourceGroup(ctx, c) + + // scale out cluster, new query node will be added to __recycle__resource_group. + // | RG | Request | Limit | Nodes | + // | -- | ------- | ----- | ----- | + // | __default__resource_group | 1 | 1 | 1 | + // | __recycle__resource_group | 0 | 10000 | 1 | + // | rg1 | 1 | 1 | 1 | + // | rg2 | 2 | 2 | 2 | + scaleTo(ctx, 5) + showAllResourceGroup(ctx, c) + + // Update resource group config, redundant query node will be transferred to recycle resource group. + // | RG | Request | Limit | Nodes | + // | -- | ------- | ----- | ----- | + // | __default__resource_group | 1 | 1 | 1 | + // | __recycle__resource_group | 0 | 10000 | 2 | + // | rg1 | 1 | 1 | 1 | + // | rg2 | 1 | 1 | 1 | + if err := c.UpdateResourceGroups(ctx, + client.WithUpdateResourceGroupConfig(rg1, newResourceGroupCfg(1, 1)), + client.WithUpdateResourceGroupConfig(rg2, newResourceGroupCfg(1, 1)), + ); err != nil { + return err + } + showAllResourceGroup(ctx, c) + + // Update resource group config, rg1 and rg2 will transfer missing node from __recycle__resource_group. + // | RG | Request | Limit | Nodes | + // | -- | ------- | ----- | ----- | + // | __default__resource_group | 1 | 1 | 1 | + // | __recycle__resource_group | 0 | 10000 | 0 | + // | rg1 | 2 | 2 | 2 | + // | rg2 | 2 | 2 | 2 | + if err := c.UpdateResourceGroups(ctx, + client.WithUpdateResourceGroupConfig(rg1, newResourceGroupCfg(2, 2)), + client.WithUpdateResourceGroupConfig(rg2, newResourceGroupCfg(2, 2)), + ); err != nil { + return err + } + showAllResourceGroup(ctx, c) + + return nil +} + +// scaleTo scales the cluster to the specified node number. +func scaleTo(_ context.Context, _ int) { + // Cannot implement by milvus core and sdk, + // Need to be implement by orchestration system. +} + +func newResourceGroupCfg(request int32, limit int32) *entity.ResourceGroupConfig { + return &entity.ResourceGroupConfig{ + Requests: &entity.ResourceGroupLimit{NodeNum: request}, + Limits: &entity.ResourceGroupLimit{NodeNum: limit}, + TransferFrom: []*entity.ResourceGroupTransfer{{ResourceGroup: recycleResourceGroup}}, + TransferTo: []*entity.ResourceGroupTransfer{{ResourceGroup: recycleResourceGroup}}, + } +} + +// showAllResourceGroup shows all resource groups. +func showAllResourceGroup(ctx context.Context, c client.Client) { + rgs, err := c.ListResourceGroups(ctx) + if err != nil { + log.Fatal("failed to list resource groups, err: ", err.Error()) + } + log.Println("resource groups:") + for _, rg := range rgs { + rg, err := c.DescribeResourceGroup(ctx, rg) + if err != nil { + log.Fatal("failed to describe resource group, err: ", err.Error()) + } + results, err := json.Marshal(rg) + if err != nil { + log.Fatal("failed to marshal resource group, err: ", err.Error()) + } + log.Printf("%s\n", results) + } +} diff --git a/test/testcases/resource_group_test.go b/test/testcases/resource_group_test.go index d8b41629..6c3cf92c 100644 --- a/test/testcases/resource_group_test.go +++ b/test/testcases/resource_group_test.go @@ -18,8 +18,10 @@ import ( "github.com/milvus-io/milvus-sdk-go/v2/test/common" ) -const configQnNodes = int32(4) -const newRgNode = int32(2) +const ( + configQnNodes = int32(4) + newRgNode = int32(2) +) func resetRgs(t *testing.T, ctx context.Context, mc *base.MilvusClient) { // release and drop all collections @@ -371,7 +373,6 @@ func TestTransferReplicas(t *testing.T) { // check search result contains search vector, which from all partitions common.CheckErr(t, err, true) common.CheckSearchResult(t, searchRes, common.DefaultNq, common.DefaultTopK) - } // test transfer replica of not existed collection