From c84a0748c48f1e1aed381955750fb66e79120bbe Mon Sep 17 00:00:00 2001 From: Zhen Ye Date: Fri, 24 Jan 2025 16:55:07 +0800 Subject: [PATCH] enhance: add rw/ro streaming query node replica management (#38677) issue: #38399 - Embed the query node into streaming node to make delegator available at streaming node. - The embedded query node has a special server label `QUERYNODE_STREAMING-EMBEDDED`. - Change the balance strategy to make the channel assigned to streaming node as much as possible. Signed-off-by: chyezh --- cmd/milvus/util.go | 6 + .../snmanager/streaming_node_manager.go | 106 ++++++++++++++++++ .../snmanager/streaming_node_manager_test.go | 62 ++++++++++ internal/querycoordv2/balance/balance.go | 18 ++- .../balance/channel_level_score_balancer.go | 16 ++- .../balance/multi_target_balance.go | 66 ++++++----- .../balance/rowcount_based_balancer.go | 96 ++++++++++------ .../balance/score_based_balancer.go | 88 +++++++++++---- .../streaming_query_node_channel_helper.go | 64 +++++++++++ ...treaming_query_node_channel_helper_test.go | 76 +++++++++++++ .../querycoordv2/checkers/channel_checker.go | 11 +- internal/querycoordv2/meta/replica.go | 101 +++++++++++++++-- internal/querycoordv2/meta/replica_manager.go | 60 ++++++++++ .../meta/replica_manager_helper.go | 68 +++++++++++ .../querycoordv2/meta/replica_manager_test.go | 45 ++++++-- internal/querycoordv2/meta/replica_test.go | 49 ++++++++ .../observers/replica_observer.go | 71 +++++++++++- .../observers/replica_observer_test.go | 102 +++++++++++++++++ internal/querycoordv2/ops_services.go | 23 +++- internal/querycoordv2/server.go | 8 ++ internal/querycoordv2/session/node_manager.go | 5 + internal/querycoordv2/utils/meta.go | 13 +++ .../server/balancer/balancer.go | 7 +- .../server/balancer/balancer_impl.go | 10 ++ .../server/balancer/balancer_test.go | 17 ++- internal/streamingcoord/server/server.go | 2 + .../flusher/flusherimpl/channel_lifetime.go | 2 +- internal/util/sessionutil/session_util.go | 5 +- internal/util/streamingutil/env.go | 17 ++- pkg/proto/query_coord.proto | 11 +- pkg/proto/querypb/query_coord.pb.go | 34 +++++- pkg/util/merr/utils.go | 9 ++ 32 files changed, 1145 insertions(+), 123 deletions(-) create mode 100644 internal/coordinator/snmanager/streaming_node_manager.go create mode 100644 internal/coordinator/snmanager/streaming_node_manager_test.go create mode 100644 internal/querycoordv2/balance/streaming_query_node_channel_helper.go create mode 100644 internal/querycoordv2/balance/streaming_query_node_channel_helper_test.go diff --git a/cmd/milvus/util.go b/cmd/milvus/util.go index 35040094aeb5f..70707d64eddb6 100644 --- a/cmd/milvus/util.go +++ b/cmd/milvus/util.go @@ -150,7 +150,9 @@ func GetMilvusRoles(args []string, flags *flag.FlagSet) *roles.MilvusRoles { role.EnableIndexNode = true case typeutil.StreamingNodeRole: streamingutil.MustEnableStreamingService() + streamingutil.EnableEmbededQueryNode() role.EnableStreamingNode = true + role.EnableQueryNode = true case typeutil.StandaloneRole, typeutil.EmbeddedRole: role.EnableRootCoord = true role.EnableProxy = true @@ -175,6 +177,10 @@ func GetMilvusRoles(args []string, flags *flag.FlagSet) *roles.MilvusRoles { role.EnableIndexNode = enableIndexNode role.EnableProxy = enableProxy role.EnableStreamingNode = enableStreamingNode + if enableStreamingNode && !enableQueryNode { + role.EnableQueryNode = true + streamingutil.EnableEmbededQueryNode() + } default: fmt.Fprintf(os.Stderr, "Unknown server type = %s\n%s", serverType, getHelp()) os.Exit(-1) diff --git a/internal/coordinator/snmanager/streaming_node_manager.go b/internal/coordinator/snmanager/streaming_node_manager.go new file mode 100644 index 0000000000000..efed218e5d452 --- /dev/null +++ b/internal/coordinator/snmanager/streaming_node_manager.go @@ -0,0 +1,106 @@ +package snmanager + +import ( + "context" + "sync" + + "github.com/cockroachdb/errors" + "go.uber.org/zap" + + "github.com/milvus-io/milvus/internal/streamingcoord/server/balancer" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/streaming/util/types" + "github.com/milvus-io/milvus/pkg/util/funcutil" + "github.com/milvus-io/milvus/pkg/util/syncutil" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +var StaticStreamingNodeManager = newStreamingNodeManager() + +func newStreamingNodeManager() *StreamingNodeManager { + snm := &StreamingNodeManager{ + notifier: syncutil.NewAsyncTaskNotifier[struct{}](), + balancer: syncutil.NewFuture[balancer.Balancer](), + cond: syncutil.NewContextCond(&sync.Mutex{}), + latestAssignments: make(map[string]types.PChannelInfoAssigned), + streamingNodes: typeutil.NewUniqueSet(), + nodeChangedNotifier: syncutil.NewVersionedNotifier(), + } + go snm.execute() + return snm +} + +// StreamingNodeManager is a manager for manage the querynode that embedded into streaming node. +// StreamingNodeManager is exclusive with ResourceManager. +type StreamingNodeManager struct { + notifier *syncutil.AsyncTaskNotifier[struct{}] + balancer *syncutil.Future[balancer.Balancer] + // The coord is merged after 2.6, so we don't need to make distribution safe. + cond *syncutil.ContextCond + latestAssignments map[string]types.PChannelInfoAssigned // The latest assignments info got from streaming coord balance module. + streamingNodes typeutil.UniqueSet + nodeChangedNotifier *syncutil.VersionedNotifier // used to notify that node in streaming node manager has been changed. +} + +// GetWALLocated returns the server id of the node that the wal of the vChannel is located. +func (s *StreamingNodeManager) GetWALLocated(vChannel string) int64 { + pchannel := funcutil.ToPhysicalChannel(vChannel) + var targetServerID int64 + + s.cond.L.Lock() + for { + if assignment, ok := s.latestAssignments[pchannel]; ok { + targetServerID = assignment.Node.ServerID + break + } + s.cond.Wait(context.Background()) + } + s.cond.L.Unlock() + return targetServerID +} + +// GetStreamingQueryNodeIDs returns the server ids of the streaming query nodes. +func (s *StreamingNodeManager) GetStreamingQueryNodeIDs() typeutil.UniqueSet { + s.cond.L.Lock() + defer s.cond.L.Unlock() + return s.streamingNodes.Clone() +} + +// ListenNodeChanged returns a listener for node changed event. +func (s *StreamingNodeManager) ListenNodeChanged() *syncutil.VersionedListener { + return s.nodeChangedNotifier.Listen(syncutil.VersionedListenAtEarliest) +} + +// SetBalancerReady set the balancer ready for the streaming node manager from streamingcoord initialization. +func (s *StreamingNodeManager) SetBalancerReady(b balancer.Balancer) { + s.balancer.Set(b) +} + +func (s *StreamingNodeManager) execute() (err error) { + defer s.notifier.Finish(struct{}{}) + + balancer, err := s.balancer.GetWithContext(s.notifier.Context()) + if err != nil { + return errors.Wrap(err, "failed to wait balancer ready") + } + for { + if err := balancer.WatchChannelAssignments(s.notifier.Context(), func( + version typeutil.VersionInt64Pair, + relations []types.PChannelInfoAssigned, + ) error { + s.cond.LockAndBroadcast() + s.latestAssignments = make(map[string]types.PChannelInfoAssigned) + s.streamingNodes = typeutil.NewUniqueSet() + for _, relation := range relations { + s.latestAssignments[relation.Channel.Name] = relation + s.streamingNodes.Insert(relation.Node.ServerID) + } + s.nodeChangedNotifier.NotifyAll() + log.Info("streaming node manager updated", zap.Any("assignments", s.latestAssignments), zap.Any("streamingNodes", s.streamingNodes)) + s.cond.L.Unlock() + return nil + }); err != nil { + return err + } + } +} diff --git a/internal/coordinator/snmanager/streaming_node_manager_test.go b/internal/coordinator/snmanager/streaming_node_manager_test.go new file mode 100644 index 0000000000000..b26fb6a63cfac --- /dev/null +++ b/internal/coordinator/snmanager/streaming_node_manager_test.go @@ -0,0 +1,62 @@ +package snmanager + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "github.com/milvus-io/milvus/internal/mocks/streamingcoord/server/mock_balancer" + "github.com/milvus-io/milvus/pkg/streaming/util/types" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +type pChannelInfoAssigned struct { + version typeutil.VersionInt64Pair + pchannels []types.PChannelInfoAssigned +} + +func TestStreamingNodeManager(t *testing.T) { + m := newStreamingNodeManager() + b := mock_balancer.NewMockBalancer(t) + + ch := make(chan pChannelInfoAssigned, 1) + b.EXPECT().WatchChannelAssignments(mock.Anything, mock.Anything).Run( + func(ctx context.Context, cb func(typeutil.VersionInt64Pair, []types.PChannelInfoAssigned) error) { + for { + select { + case <-ctx.Done(): + return + case p := <-ch: + cb(p.version, p.pchannels) + } + } + }) + m.SetBalancerReady(b) + + streamingNodes := m.GetStreamingQueryNodeIDs() + assert.Empty(t, streamingNodes) + + ch <- pChannelInfoAssigned{ + version: typeutil.VersionInt64Pair{ + Global: 1, + Local: 1, + }, + pchannels: []types.PChannelInfoAssigned{ + { + Channel: types.PChannelInfo{Name: "a_test", Term: 1}, + Node: types.StreamingNodeInfo{ServerID: 1, Address: "localhost:1"}, + }, + }, + } + + listener := m.ListenNodeChanged() + err := listener.Wait(context.Background()) + assert.NoError(t, err) + + node := m.GetWALLocated("a_test") + assert.Equal(t, node, int64(1)) + streamingNodes = m.GetStreamingQueryNodeIDs() + assert.Equal(t, len(streamingNodes), 1) +} diff --git a/internal/querycoordv2/balance/balance.go b/internal/querycoordv2/balance/balance.go index ff8103e3b45d9..5ba140258b8f7 100644 --- a/internal/querycoordv2/balance/balance.go +++ b/internal/querycoordv2/balance/balance.go @@ -27,6 +27,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/meta" "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/task" + "github.com/milvus-io/milvus/internal/util/streamingutil" "github.com/milvus-io/milvus/pkg/util/paramtable" ) @@ -108,6 +109,8 @@ func (b *RoundRobinBalancer) AssignSegment(ctx context.Context, collectionID int } func (b *RoundRobinBalancer) AssignChannel(ctx context.Context, collectionID int64, channels []*meta.DmChannel, nodes []int64, forceAssign bool) []ChannelAssignPlan { + nodes = filterSQNIfStreamingServiceEnabled(nodes) + // skip out suspend node and stopping node during assignment, but skip this check for manual balance if !forceAssign { versionRangeFilter := semver.MustParseRange(">2.3.x") @@ -122,22 +125,29 @@ func (b *RoundRobinBalancer) AssignChannel(ctx context.Context, collectionID int if len(nodesInfo) == 0 { return nil } + + plans := make([]ChannelAssignPlan, 0) + scoreDelta := make(map[int64]int) + if streamingutil.IsStreamingServiceEnabled() { + channels, plans, scoreDelta = assignChannelToWALLocatedFirstForNodeInfo(channels, nodesInfo) + } + sort.Slice(nodesInfo, func(i, j int) bool { cnt1, cnt2 := nodesInfo[i].ChannelCnt(), nodesInfo[j].ChannelCnt() id1, id2 := nodesInfo[i].ID(), nodesInfo[j].ID() - delta1, delta2 := b.scheduler.GetChannelTaskDelta(id1, -1), b.scheduler.GetChannelTaskDelta(id2, -1) + delta1, delta2 := b.scheduler.GetChannelTaskDelta(id1, -1)+scoreDelta[id1], b.scheduler.GetChannelTaskDelta(id2, -1)+scoreDelta[id2] return cnt1+delta1 < cnt2+delta2 }) - ret := make([]ChannelAssignPlan, 0, len(channels)) + for i, c := range channels { plan := ChannelAssignPlan{ Channel: c, From: -1, To: nodesInfo[i%len(nodesInfo)].ID(), } - ret = append(ret, plan) + plans = append(plans, plan) } - return ret + return plans } func (b *RoundRobinBalancer) BalanceReplica(ctx context.Context, replica *meta.Replica) ([]SegmentAssignPlan, []ChannelAssignPlan) { diff --git a/internal/querycoordv2/balance/channel_level_score_balancer.go b/internal/querycoordv2/balance/channel_level_score_balancer.go index 92d12f0b1f20f..9fcbe71283c3a 100644 --- a/internal/querycoordv2/balance/channel_level_score_balancer.go +++ b/internal/querycoordv2/balance/channel_level_score_balancer.go @@ -28,6 +28,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/meta" "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/task" + "github.com/milvus-io/milvus/internal/util/streamingutil" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -67,6 +68,17 @@ func (b *ChannelLevelScoreBalancer) BalanceReplica(ctx context.Context, replica } }() + if streamingutil.IsStreamingServiceEnabled() { + // Make a plan to rebalance the channel first. + // The Streaming QueryNode doesn't make the channel level score, so just fallback to the ScoreBasedBalancer. + stoppingBalance := paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool() + channelPlan := b.ScoreBasedBalancer.balanceChannels(ctx, br, replica, stoppingBalance) + // If the channelPlan is not empty, do it directly, don't do the segment balance. + if len(channelPlan) > 0 { + return nil, channelPlan + } + } + exclusiveMode := true channels := b.targetMgr.GetDmChannelsByCollection(ctx, replica.GetCollectionID(), meta.CurrentTarget) for channelName := range channels { @@ -122,7 +134,7 @@ func (b *ChannelLevelScoreBalancer) BalanceReplica(ctx context.Context, replica zap.Any("available nodes", rwNodes), ) // handle stopped nodes here, have to assign segments on stopping nodes to nodes with the smallest score - if b.permitBalanceChannel(replica.GetCollectionID()) { + if b.permitBalanceChannel(replica.GetCollectionID()) && !streamingutil.IsStreamingServiceEnabled() { channelPlans = append(channelPlans, b.genStoppingChannelPlan(ctx, replica, channelName, rwNodes, roNodes)...) } @@ -130,7 +142,7 @@ func (b *ChannelLevelScoreBalancer) BalanceReplica(ctx context.Context, replica segmentPlans = append(segmentPlans, b.genStoppingSegmentPlan(ctx, replica, channelName, rwNodes, roNodes)...) } } else { - if paramtable.Get().QueryCoordCfg.AutoBalanceChannel.GetAsBool() && b.permitBalanceChannel(replica.GetCollectionID()) { + if paramtable.Get().QueryCoordCfg.AutoBalanceChannel.GetAsBool() && b.permitBalanceChannel(replica.GetCollectionID()) && !streamingutil.IsStreamingServiceEnabled() { channelPlans = append(channelPlans, b.genChannelPlan(ctx, replica, channelName, rwNodes)...) } diff --git a/internal/querycoordv2/balance/multi_target_balance.go b/internal/querycoordv2/balance/multi_target_balance.go index ca078d5992743..051ae6ebbf071 100644 --- a/internal/querycoordv2/balance/multi_target_balance.go +++ b/internal/querycoordv2/balance/multi_target_balance.go @@ -15,6 +15,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/task" + "github.com/milvus-io/milvus/internal/util/streamingutil" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -485,24 +486,53 @@ func (b *MultiTargetBalancer) BalanceReplica(ctx context.Context, replica *meta. } }() - if replica.NodesCount() == 0 { - return nil, nil + stoppingBalance := paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool() + + channelPlans = b.balanceChannels(ctx, br, replica, stoppingBalance) + if len(channelPlans) == 0 { + segmentPlans = b.balanceSegments(ctx, replica, stoppingBalance) + } + return +} + +func (b *MultiTargetBalancer) balanceChannels(ctx context.Context, br *balanceReport, replica *meta.Replica, stoppingBalance bool) []ChannelAssignPlan { + var rwNodes, roNodes []int64 + if streamingutil.IsStreamingServiceEnabled() { + rwNodes, roNodes = replica.GetRWSQNodes(), replica.GetROSQNodes() + } else { + rwNodes, roNodes = replica.GetRWNodes(), replica.GetRONodes() + } + + if len(rwNodes) == 0 || !b.permitBalanceChannel(replica.GetCollectionID()) { + return nil + } + + if len(roNodes) != 0 { + if !stoppingBalance { + log.RatedInfo(10, "stopping balance is disabled!", zap.Int64s("stoppingNode", roNodes)) + return nil + } + return b.genStoppingChannelPlan(ctx, replica, rwNodes, roNodes) } + if paramtable.Get().QueryCoordCfg.AutoBalanceChannel.GetAsBool() { + return b.genChannelPlan(ctx, br, replica, rwNodes) + } + return nil +} + +func (b *MultiTargetBalancer) balanceSegments(ctx context.Context, replica *meta.Replica, stoppingBalance bool) []SegmentAssignPlan { rwNodes := replica.GetRWNodes() roNodes := replica.GetRONodes() - if len(rwNodes) == 0 { - // no available nodes to balance - return nil, nil + if len(rwNodes) == 0 || !b.permitBalanceSegment(replica.GetCollectionID()) { + return nil } - // print current distribution before generating plans - segmentPlans, channelPlans = make([]SegmentAssignPlan, 0), make([]ChannelAssignPlan, 0) if len(roNodes) != 0 { - if !paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool() { + if !stoppingBalance { log.RatedInfo(10, "stopping balance is disabled!", zap.Int64s("stoppingNode", roNodes)) - return nil, nil + return nil } log.Info("Handle stopping nodes", @@ -510,23 +540,9 @@ func (b *MultiTargetBalancer) BalanceReplica(ctx context.Context, replica *meta. zap.Any("available nodes", rwNodes), ) // handle stopped nodes here, have to assign segments on stopping nodes to nodes with the smallest score - if b.permitBalanceChannel(replica.GetCollectionID()) { - channelPlans = append(channelPlans, b.genStoppingChannelPlan(ctx, replica, rwNodes, roNodes)...) - } - if len(channelPlans) == 0 && b.permitBalanceSegment(replica.GetCollectionID()) { - segmentPlans = append(segmentPlans, b.genStoppingSegmentPlan(ctx, replica, rwNodes, roNodes)...) - } - } else { - if paramtable.Get().QueryCoordCfg.AutoBalanceChannel.GetAsBool() && b.permitBalanceChannel(replica.GetCollectionID()) { - channelPlans = append(channelPlans, b.genChannelPlan(ctx, br, replica, rwNodes)...) - } - - if len(channelPlans) == 0 && b.permitBalanceSegment(replica.GetCollectionID()) { - segmentPlans = b.genSegmentPlan(ctx, replica, rwNodes) - } + return b.genStoppingSegmentPlan(ctx, replica, rwNodes, roNodes) } - - return segmentPlans, channelPlans + return b.genSegmentPlan(ctx, replica, rwNodes) } func (b *MultiTargetBalancer) genSegmentPlan(ctx context.Context, replica *meta.Replica, rwNodes []int64) []SegmentAssignPlan { diff --git a/internal/querycoordv2/balance/rowcount_based_balancer.go b/internal/querycoordv2/balance/rowcount_based_balancer.go index d5be949695489..6b62714a62d08 100644 --- a/internal/querycoordv2/balance/rowcount_based_balancer.go +++ b/internal/querycoordv2/balance/rowcount_based_balancer.go @@ -26,9 +26,11 @@ import ( "github.com/samber/lo" "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/coordinator/snmanager" "github.com/milvus-io/milvus/internal/querycoordv2/meta" "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/task" + "github.com/milvus-io/milvus/internal/util/streamingutil" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/paramtable" ) @@ -87,6 +89,8 @@ func (b *RowCountBasedBalancer) AssignSegment(ctx context.Context, collectionID // AssignSegment, when row count based balancer assign segments, it will assign channel to node with least global channel count. // try to make every query node has channel count func (b *RowCountBasedBalancer) AssignChannel(ctx context.Context, collectionID int64, channels []*meta.DmChannel, nodes []int64, forceAssign bool) []ChannelAssignPlan { + nodes = filterSQNIfStreamingServiceEnabled(nodes) + // skip out suspend node and stopping node during assignment, but skip this check for manual balance if !forceAssign { versionRangeFilter := semver.MustParseRange(">2.3.x") @@ -99,19 +103,29 @@ func (b *RowCountBasedBalancer) AssignChannel(ctx context.Context, collectionID } nodeItems := b.convertToNodeItemsByChannel(nodes) - nodeItems = lo.Shuffle(nodeItems) if len(nodeItems) == 0 { return nil } + queue := newPriorityQueue() for _, item := range nodeItems { queue.push(item) } - plans := make([]ChannelAssignPlan, 0, len(channels)) + plans := make([]ChannelAssignPlan, 0) for _, c := range channels { - // pick the node with the least channel num and allocate to it. - ni := queue.pop().(*nodeItem) + var ni *nodeItem + if streamingutil.IsStreamingServiceEnabled() { + // When streaming service is enabled, we need to assign channel to the node where WAL is located. + nodeID := snmanager.StaticStreamingNodeManager.GetWALLocated(c.GetChannelName()) + if item, ok := nodeItems[nodeID]; ok { + ni = item + } + } + if ni == nil { + // pick the node with the least channel num and allocate to it. + ni = queue.pop().(*nodeItem) + } plan := ChannelAssignPlan{ From: -1, To: ni.nodeID, @@ -151,8 +165,8 @@ func (b *RowCountBasedBalancer) convertToNodeItemsBySegment(nodeIDs []int64) []* return ret } -func (b *RowCountBasedBalancer) convertToNodeItemsByChannel(nodeIDs []int64) []*nodeItem { - ret := make([]*nodeItem, 0, len(nodeIDs)) +func (b *RowCountBasedBalancer) convertToNodeItemsByChannel(nodeIDs []int64) map[int64]*nodeItem { + ret := make(map[int64]*nodeItem, len(nodeIDs)) for _, node := range nodeIDs { channels := b.dist.ChannelDistManager.GetByFilter(meta.WithNodeID2Channel(node)) @@ -161,7 +175,7 @@ func (b *RowCountBasedBalancer) convertToNodeItemsByChannel(nodeIDs []int64) []* channelCount += b.scheduler.GetChannelTaskDelta(node, -1) // more channel num, less priority nodeItem := newNodeItem(channelCount, node) - ret = append(ret, &nodeItem) + ret[node] = &nodeItem } return ret } @@ -181,22 +195,53 @@ func (b *RowCountBasedBalancer) BalanceReplica(ctx context.Context, replica *met log.Info("balance plan generated", zap.Stringers("report details", br.records)) } }() - if replica.NodesCount() == 0 { - return nil, nil + + stoppingBalance := paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool() + + channelPlans = b.balanceChannels(ctx, br, replica, stoppingBalance) + if len(channelPlans) == 0 { + segmentPlans = b.balanceSegments(ctx, replica, stoppingBalance) } + return +} +func (b *RowCountBasedBalancer) balanceChannels(ctx context.Context, br *balanceReport, replica *meta.Replica, stoppingBalance bool) []ChannelAssignPlan { + var rwNodes, roNodes []int64 + if streamingutil.IsStreamingServiceEnabled() { + rwNodes, roNodes = replica.GetRWSQNodes(), replica.GetROSQNodes() + } else { + rwNodes, roNodes = replica.GetRWNodes(), replica.GetRONodes() + } + + if len(rwNodes) == 0 || !b.permitBalanceChannel(replica.GetCollectionID()) { + return nil + } + if len(roNodes) != 0 { + if !stoppingBalance { + log.RatedInfo(10, "stopping balance is disabled!", zap.Int64s("stoppingNode", roNodes)) + return nil + } + return b.genStoppingChannelPlan(ctx, replica, rwNodes, roNodes) + } + + if paramtable.Get().QueryCoordCfg.AutoBalanceChannel.GetAsBool() { + return b.genChannelPlan(ctx, br, replica, rwNodes) + } + return nil +} + +func (b *RowCountBasedBalancer) balanceSegments(ctx context.Context, replica *meta.Replica, stoppingBalance bool) []SegmentAssignPlan { rwNodes := replica.GetRWNodes() roNodes := replica.GetRONodes() - if len(rwNodes) == 0 { - // no available nodes to balance - return nil, nil - } - segmentPlans, channelPlans = make([]SegmentAssignPlan, 0), make([]ChannelAssignPlan, 0) + if len(rwNodes) == 0 || !b.permitBalanceSegment(replica.GetCollectionID()) { + return nil + } + // print current distribution before generating plans if len(roNodes) != 0 { - if !paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool() { + if !stoppingBalance { log.RatedInfo(10, "stopping balance is disabled!", zap.Int64s("stoppingNode", roNodes)) - return nil, nil + return nil } log.Info("Handle stopping nodes", @@ -204,24 +249,9 @@ func (b *RowCountBasedBalancer) BalanceReplica(ctx context.Context, replica *met zap.Any("available nodes", rwNodes), ) // handle stopped nodes here, have to assign segments on stopping nodes to nodes with the smallest score - if b.permitBalanceChannel(replica.GetCollectionID()) { - channelPlans = append(channelPlans, b.genStoppingChannelPlan(ctx, replica, rwNodes, roNodes)...) - } - - if len(channelPlans) == 0 && b.permitBalanceSegment(replica.GetCollectionID()) { - segmentPlans = append(segmentPlans, b.genStoppingSegmentPlan(ctx, replica, rwNodes, roNodes)...) - } - } else { - if paramtable.Get().QueryCoordCfg.AutoBalanceChannel.GetAsBool() && b.permitBalanceChannel(replica.GetCollectionID()) { - channelPlans = append(channelPlans, b.genChannelPlan(ctx, br, replica, rwNodes)...) - } - - if len(channelPlans) == 0 && b.permitBalanceSegment(replica.GetCollectionID()) { - segmentPlans = append(segmentPlans, b.genSegmentPlan(ctx, replica, rwNodes)...) - } + return b.genStoppingSegmentPlan(ctx, replica, rwNodes, roNodes) } - - return segmentPlans, channelPlans + return b.genSegmentPlan(ctx, replica, rwNodes) } func (b *RowCountBasedBalancer) genStoppingSegmentPlan(ctx context.Context, replica *meta.Replica, rwNodes []int64, roNodes []int64) []SegmentAssignPlan { diff --git a/internal/querycoordv2/balance/score_based_balancer.go b/internal/querycoordv2/balance/score_based_balancer.go index 8e19c8921de3d..228153f008ee8 100644 --- a/internal/querycoordv2/balance/score_based_balancer.go +++ b/internal/querycoordv2/balance/score_based_balancer.go @@ -25,10 +25,12 @@ import ( "github.com/samber/lo" "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/coordinator/snmanager" "github.com/milvus-io/milvus/internal/querycoordv2/meta" "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/task" + "github.com/milvus-io/milvus/internal/util/streamingutil" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/paramtable" ) @@ -149,6 +151,8 @@ func (b *ScoreBasedBalancer) AssignChannel(ctx context.Context, collectionID int } func (b *ScoreBasedBalancer) assignChannel(br *balanceReport, collectionID int64, channels []*meta.DmChannel, nodes []int64, forceAssign bool) []ChannelAssignPlan { + nodes = filterSQNIfStreamingServiceEnabled(nodes) + balanceBatchSize := math.MaxInt64 if !forceAssign { nodes = lo.Filter(nodes, func(node int64, _ int) bool { @@ -175,8 +179,18 @@ func (b *ScoreBasedBalancer) assignChannel(br *balanceReport, collectionID int64 plans := make([]ChannelAssignPlan, 0, len(channels)) for _, ch := range channels { func(ch *meta.DmChannel) { + var targetNode *nodeItem + if streamingutil.IsStreamingServiceEnabled() { + // When streaming service is enabled, we need to assign channel to the node where WAL is located. + nodeID := snmanager.StaticStreamingNodeManager.GetWALLocated(ch.GetChannelName()) + if item, ok := nodeItemsMap[nodeID]; ok { + targetNode = item + } + } // for each channel, pick the node with the least score - targetNode := queue.pop().(*nodeItem) + if targetNode == nil { + targetNode = queue.pop().(*nodeItem) + } // make sure candidate is always push back defer queue.push(targetNode) scoreChanges := b.calculateChannelScore(ch, collectionID) @@ -439,52 +453,78 @@ func (b *ScoreBasedBalancer) BalanceReplica(ctx context.Context, replica *meta.R log.Info("balance plan generated", zap.Stringers("nodesInfo", br.NodesInfo()), zap.Stringers("report details", br.records)) } }() + if replica.NodesCount() == 0 { br.AddRecord(StrRecord("replica has no querynode")) return nil, nil } + stoppingBalance := paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool() + + channelPlans = b.balanceChannels(ctx, br, replica, stoppingBalance) + if len(channelPlans) == 0 { + segmentPlans = b.balanceSegments(ctx, br, replica, stoppingBalance) + } + return +} + +func (b *ScoreBasedBalancer) balanceChannels(ctx context.Context, br *balanceReport, replica *meta.Replica, stoppingBalance bool) []ChannelAssignPlan { + var rwNodes []int64 + var roNodes []int64 + if streamingutil.IsStreamingServiceEnabled() { + rwNodes, roNodes = replica.GetRWSQNodes(), replica.GetROSQNodes() + } else { + rwNodes, roNodes = replica.GetRWNodes(), replica.GetRONodes() + } + + if len(rwNodes) == 0 || !b.permitBalanceChannel(replica.GetCollectionID()) { + return nil + } + + if len(roNodes) != 0 { + if !stoppingBalance { + log.RatedInfo(10, "stopping balance is disabled!", zap.Int64s("stoppingNode", roNodes)) + br.AddRecord(StrRecord("stopping balance is disabled")) + return nil + } + + br.AddRecord(StrRecordf("executing stopping balance: %v", roNodes)) + return b.genStoppingChannelPlan(ctx, replica, rwNodes, roNodes) + } + + if paramtable.Get().QueryCoordCfg.AutoBalanceChannel.GetAsBool() { + return b.genChannelPlan(ctx, br, replica, rwNodes) + } + return nil +} + +func (b *ScoreBasedBalancer) balanceSegments(ctx context.Context, br *balanceReport, replica *meta.Replica, stoppingBalance bool) []SegmentAssignPlan { rwNodes := replica.GetRWNodes() roNodes := replica.GetRONodes() if len(rwNodes) == 0 { // no available nodes to balance br.AddRecord(StrRecord("no rwNodes to balance")) - return nil, nil + return nil + } + if !b.permitBalanceSegment(replica.GetCollectionID()) { + return nil } - // print current distribution before generating plans - segmentPlans, channelPlans = make([]SegmentAssignPlan, 0), make([]ChannelAssignPlan, 0) if len(roNodes) != 0 { - if !paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool() { + if !stoppingBalance { log.RatedInfo(10, "stopping balance is disabled!", zap.Int64s("stoppingNode", roNodes)) - br.AddRecord(StrRecord("stopping balance is disabled")) - return nil, nil + return nil } log.Info("Handle stopping nodes", zap.Any("stopping nodes", roNodes), zap.Any("available nodes", rwNodes), ) - br.AddRecord(StrRecordf("executing stopping balance: %v", roNodes)) // handle stopped nodes here, have to assign segments on stopping nodes to nodes with the smallest score - if b.permitBalanceChannel(replica.GetCollectionID()) { - channelPlans = append(channelPlans, b.genStoppingChannelPlan(ctx, replica, rwNodes, roNodes)...) - } - if len(channelPlans) == 0 && b.permitBalanceSegment(replica.GetCollectionID()) { - segmentPlans = append(segmentPlans, b.genStoppingSegmentPlan(ctx, replica, rwNodes, roNodes)...) - } - } else { - if paramtable.Get().QueryCoordCfg.AutoBalanceChannel.GetAsBool() && b.permitBalanceChannel(replica.GetCollectionID()) { - channelPlans = append(channelPlans, b.genChannelPlan(ctx, br, replica, rwNodes)...) - } - - if len(channelPlans) == 0 && b.permitBalanceSegment(replica.GetCollectionID()) { - segmentPlans = append(segmentPlans, b.genSegmentPlan(ctx, br, replica, rwNodes)...) - } + return b.genStoppingSegmentPlan(ctx, replica, rwNodes, roNodes) } - - return segmentPlans, channelPlans + return b.genSegmentPlan(ctx, br, replica, rwNodes) } func (b *ScoreBasedBalancer) genStoppingChannelPlan(ctx context.Context, replica *meta.Replica, rwNodes []int64, roNodes []int64) []ChannelAssignPlan { diff --git a/internal/querycoordv2/balance/streaming_query_node_channel_helper.go b/internal/querycoordv2/balance/streaming_query_node_channel_helper.go new file mode 100644 index 0000000000000..5a8d755d56b1f --- /dev/null +++ b/internal/querycoordv2/balance/streaming_query_node_channel_helper.go @@ -0,0 +1,64 @@ +package balance + +import ( + "go.uber.org/zap" + + "github.com/milvus-io/milvus/internal/coordinator/snmanager" + "github.com/milvus-io/milvus/internal/querycoordv2/meta" + "github.com/milvus-io/milvus/internal/querycoordv2/session" + "github.com/milvus-io/milvus/internal/util/streamingutil" + "github.com/milvus-io/milvus/pkg/log" +) + +func assignChannelToWALLocatedFirstForNodeInfo( + channels []*meta.DmChannel, + nodeItems []*session.NodeInfo, +) (notFoundChannels []*meta.DmChannel, plans []ChannelAssignPlan, scoreDelta map[int64]int) { + plans = make([]ChannelAssignPlan, 0) + notFoundChannels = make([]*meta.DmChannel, 0) + scoreDelta = make(map[int64]int) + for _, c := range channels { + nodeID := snmanager.StaticStreamingNodeManager.GetWALLocated(c.GetChannelName()) + // Check if nodeID is in the list of nodeItems + // The nodeID may not be in the nodeItems when multi replica mode. + // Only one replica can be assigned to the node that wal is located. + found := false + for _, item := range nodeItems { + if item.ID() == nodeID { + plans = append(plans, ChannelAssignPlan{ + From: -1, + To: item.ID(), + Channel: c, + }) + found = true + scoreDelta[item.ID()] += 1 + break + } + } + if !found { + notFoundChannels = append(notFoundChannels, c) + } + } + return notFoundChannels, plans, scoreDelta +} + +// filterSQNIfStreamingServiceEnabled filter out the non-sqn querynode. +func filterSQNIfStreamingServiceEnabled(nodes []int64) []int64 { + if streamingutil.IsStreamingServiceEnabled() { + sqns := snmanager.StaticStreamingNodeManager.GetStreamingQueryNodeIDs() + expectedSQNs := make([]int64, 0, len(nodes)) + unexpectedNodes := make([]int64, 0) + for _, node := range nodes { + if sqns.Contain(node) { + expectedSQNs = append(expectedSQNs, node) + } else { + unexpectedNodes = append(unexpectedNodes, node) + } + } + if len(unexpectedNodes) > 0 { + log.Warn("unexpected streaming querynode found when enable streaming service", zap.Int64s("unexpectedNodes", unexpectedNodes)) + } + return expectedSQNs + } + return nodes +} diff --git a/internal/querycoordv2/balance/streaming_query_node_channel_helper_test.go b/internal/querycoordv2/balance/streaming_query_node_channel_helper_test.go new file mode 100644 index 0000000000000..37e685ea457d4 --- /dev/null +++ b/internal/querycoordv2/balance/streaming_query_node_channel_helper_test.go @@ -0,0 +1,76 @@ +package balance + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "github.com/milvus-io/milvus/internal/coordinator/snmanager" + "github.com/milvus-io/milvus/internal/mocks/streamingcoord/server/mock_balancer" + "github.com/milvus-io/milvus/internal/querycoordv2/meta" + "github.com/milvus-io/milvus/internal/querycoordv2/session" + "github.com/milvus-io/milvus/pkg/proto/datapb" + "github.com/milvus-io/milvus/pkg/streaming/util/types" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +func TestAssignChannelToWALLocatedFirst(t *testing.T) { + balancer := mock_balancer.NewMockBalancer(t) + snmanager.StaticStreamingNodeManager.SetBalancerReady(balancer) + + balancer.EXPECT().WatchChannelAssignments(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, cb func(typeutil.VersionInt64Pair, []types.PChannelInfoAssigned) error) error { + versions := []typeutil.VersionInt64Pair{ + {Global: 1, Local: 2}, + } + pchans := [][]types.PChannelInfoAssigned{ + { + types.PChannelInfoAssigned{ + Channel: types.PChannelInfo{Name: "pchannel", Term: 1}, + Node: types.StreamingNodeInfo{ServerID: 1, Address: "localhost:1"}, + }, + types.PChannelInfoAssigned{ + Channel: types.PChannelInfo{Name: "pchannel2", Term: 1}, + Node: types.StreamingNodeInfo{ServerID: 2, Address: "localhost:1"}, + }, + types.PChannelInfoAssigned{ + Channel: types.PChannelInfo{Name: "pchannel3", Term: 1}, + Node: types.StreamingNodeInfo{ServerID: 3, Address: "localhost:1"}, + }, + }, + } + for i := 0; i < len(versions); i++ { + cb(versions[i], pchans[i]) + } + <-ctx.Done() + return context.Cause(ctx) + }) + + channels := []*meta.DmChannel{ + {VchannelInfo: &datapb.VchannelInfo{ChannelName: "pchannel_v1"}}, + {VchannelInfo: &datapb.VchannelInfo{ChannelName: "pchannel2_v2"}}, + {VchannelInfo: &datapb.VchannelInfo{ChannelName: "pchannel3_v1"}}, + } + + var scoreDelta map[int64]int + nodeInfos := []*session.NodeInfo{ + session.NewNodeInfo(session.ImmutableNodeInfo{NodeID: 1}), + session.NewNodeInfo(session.ImmutableNodeInfo{NodeID: 2}), + } + + notFounChannels, plans, scoreDelta := assignChannelToWALLocatedFirstForNodeInfo(channels, nodeInfos) + assert.Len(t, notFounChannels, 1) + assert.Equal(t, notFounChannels[0].GetChannelName(), "pchannel3_v1") + assert.Len(t, plans, 2) + assert.Len(t, scoreDelta, 2) + for _, plan := range plans { + if plan.Channel.GetChannelName() == "pchannel_v1" { + assert.Equal(t, plan.To, int64(1)) + assert.Equal(t, scoreDelta[1], 1) + } else { + assert.Equal(t, plan.To, int64(2)) + assert.Equal(t, scoreDelta[2], 1) + } + } +} diff --git a/internal/querycoordv2/checkers/channel_checker.go b/internal/querycoordv2/checkers/channel_checker.go index 231c1f8dfdf15..1e869b30c178e 100644 --- a/internal/querycoordv2/checkers/channel_checker.go +++ b/internal/querycoordv2/checkers/channel_checker.go @@ -30,6 +30,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/task" "github.com/milvus-io/milvus/internal/querycoordv2/utils" + "github.com/milvus-io/milvus/internal/util/streamingutil" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -228,9 +229,13 @@ func (c *ChannelChecker) findRepeatedChannels(ctx context.Context, replicaID int func (c *ChannelChecker) createChannelLoadTask(ctx context.Context, channels []*meta.DmChannel, replica *meta.Replica) []task.Task { plans := make([]balance.ChannelAssignPlan, 0) for _, ch := range channels { - rwNodes := replica.GetChannelRWNodes(ch.GetChannelName()) - if len(rwNodes) == 0 { - rwNodes = replica.GetRWNodes() + var rwNodes []int64 + if streamingutil.IsStreamingServiceEnabled() { + rwNodes = replica.GetRWSQNodes() + } else { + if rwNodes = replica.GetChannelRWNodes(ch.GetChannelName()); len(rwNodes) == 0 { + rwNodes = replica.GetRWNodes() + } } plan := c.getBalancerFunc().AssignChannel(ctx, replica.GetCollectionID(), []*meta.DmChannel{ch}, rwNodes, true) plans = append(plans, plan...) diff --git a/internal/querycoordv2/meta/replica.go b/internal/querycoordv2/meta/replica.go index 8f1ab12b0c5ad..ff28590d091f6 100644 --- a/internal/querycoordv2/meta/replica.go +++ b/internal/querycoordv2/meta/replica.go @@ -18,13 +18,23 @@ var NilReplica = newReplica(&querypb.Replica{ // So only read only operations are allowed on these type. type Replica struct { replicaPB *querypb.Replica - rwNodes typeutil.UniqueSet // a helper field for manipulating replica's Available Nodes slice field. + // Nodes is the legacy querynode that is not embedded in the streamingnode, which can only load sealed segment. + rwNodes typeutil.UniqueSet // a helper field for manipulating replica's Available Nodes slice field. // always keep consistent with replicaPB.Nodes. // mutual exclusive with roNodes. roNodes typeutil.UniqueSet // a helper field for manipulating replica's RO Nodes slice field. // always keep consistent with replicaPB.RoNodes. - // node used by replica but cannot add more channel or segment ont it. + // node used by replica but cannot add segment on it. // include rebalance node or node out of resource group. + + // SQNodes is the querynode that is embedded in the streamingnode, which can only watch channel and load growing segment. + rwSQNodes typeutil.UniqueSet // a helper field for manipulating replica's RW SQ Nodes slice field. + // always keep consistent with replicaPB.RwSqNodes. + // mutable exclusive with roSQNodes. + roSQNodes typeutil.UniqueSet // a helper field for manipulating replica's RO SQ Nodes slice field. + // always keep consistent with replicaPB.RoSqNodes. + // node used by replica but cannot add more channel on it. + // include the rebalance node. } // Deprecated: may break the consistency of ReplicaManager, use `Spawn` of `ReplicaManager` or `newReplica` instead. @@ -44,6 +54,8 @@ func newReplica(replica *querypb.Replica) *Replica { replicaPB: proto.Clone(replica).(*querypb.Replica), rwNodes: typeutil.NewUniqueSet(replica.Nodes...), roNodes: typeutil.NewUniqueSet(replica.RoNodes...), + rwSQNodes: typeutil.NewUniqueSet(replica.RwSqNodes...), + roSQNodes: typeutil.NewUniqueSet(replica.RoSqNodes...), } } @@ -65,10 +77,12 @@ func (replica *Replica) GetResourceGroup() string { // GetNodes returns the rw nodes of the replica. // readonly, don't modify the returned slice. func (replica *Replica) GetNodes() []int64 { - nodes := make([]int64, 0) - nodes = append(nodes, replica.replicaPB.GetRoNodes()...) - nodes = append(nodes, replica.replicaPB.GetNodes()...) - return nodes + nodes := typeutil.NewUniqueSet() + nodes.Insert(replica.replicaPB.GetRoNodes()...) + nodes.Insert(replica.replicaPB.GetNodes()...) + nodes.Insert(replica.replicaPB.GetRwSqNodes()...) + nodes.Insert(replica.replicaPB.GetRoSqNodes()...) + return nodes.Collect() } // GetRONodes returns the ro nodes of the replica. @@ -83,6 +97,18 @@ func (replica *Replica) GetRWNodes() []int64 { return replica.replicaPB.GetNodes() } +// GetROSQNodes returns the ro sq nodes of the replica. +// readonly, don't modify the returned slice. +func (replica *Replica) GetROSQNodes() []int64 { + return replica.replicaPB.GetRoSqNodes() +} + +// GetRWSQNodes returns the rw sq nodes of the replica. +// readonly, don't modify the returned slice. +func (replica *Replica) GetRWSQNodes() []int64 { + return replica.replicaPB.GetRwSqNodes() +} + // RangeOverRWNodes iterates over the read and write nodes of the replica. func (replica *Replica) RangeOverRWNodes(f func(node int64) bool) { replica.rwNodes.Range(f) @@ -93,6 +119,16 @@ func (replica *Replica) RangeOverRONodes(f func(node int64) bool) { replica.roNodes.Range(f) } +// RangeOverRWSQNodes iterates over the read and write streaming query nodes of the replica. +func (replica *Replica) RangeOverRWSQNodes(f func(node int64) bool) { + replica.rwSQNodes.Range(f) +} + +// RangeOverROSQNodes iterates over the ro streaming query nodes of the replica. +func (replica *Replica) RangeOverROSQNodes(f func(node int64) bool) { + replica.roSQNodes.Range(f) +} + // RWNodesCount returns the count of rw nodes of the replica. func (replica *Replica) RWNodesCount() int { return replica.rwNodes.Len() @@ -103,6 +139,16 @@ func (replica *Replica) RONodesCount() int { return replica.roNodes.Len() } +// RWSQNodesCount returns the count of rw nodes of the replica. +func (replica *Replica) RWSQNodesCount() int { + return replica.rwSQNodes.Len() +} + +// ROSQNodesCount returns the count of ro nodes of the replica. +func (replica *Replica) ROSQNodesCount() int { + return replica.roSQNodes.Len() +} + // NodesCount returns the count of rw nodes and ro nodes of the replica. func (replica *Replica) NodesCount() int { return replica.rwNodes.Len() + replica.roNodes.Len() @@ -110,7 +156,7 @@ func (replica *Replica) NodesCount() int { // Contains checks if the node is in rw nodes of the replica. func (replica *Replica) Contains(node int64) bool { - return replica.ContainRONode(node) || replica.ContainRWNode(node) + return replica.ContainRONode(node) || replica.ContainRWNode(node) || replica.ContainSQNode(node) || replica.ContainRWSQNode(node) } // ContainRONode checks if the node is in ro nodes of the replica. @@ -123,6 +169,21 @@ func (replica *Replica) ContainRWNode(node int64) bool { return replica.rwNodes.Contain(node) } +// ContainSQNode checks if the node is in rw sq nodes of the replica. +func (replica *Replica) ContainSQNode(node int64) bool { + return replica.ContainROSQNode(node) || replica.ContainRWSQNode(node) +} + +// ContainRWSQNode checks if the node is in rw sq nodes of the replica. +func (replica *Replica) ContainROSQNode(node int64) bool { + return replica.roSQNodes.Contain(node) +} + +// ContainRWSQNode checks if the node is in rw sq nodes of the replica. +func (replica *Replica) ContainRWSQNode(node int64) bool { + return replica.rwSQNodes.Contain(node) +} + // Deprecated: Warning, break the consistency of ReplicaManager, use `SetAvailableNodesInSameCollectionAndRG` in ReplicaManager instead. // TODO: removed in future, only for old unittest now. func (replica *Replica) AddRWNode(nodes ...int64) { @@ -154,6 +215,8 @@ func (replica *Replica) CopyForWrite() *mutableReplica { replicaPB: proto.Clone(replica.replicaPB).(*querypb.Replica), rwNodes: typeutil.NewUniqueSet(replica.replicaPB.Nodes...), roNodes: typeutil.NewUniqueSet(replica.replicaPB.RoNodes...), + rwSQNodes: typeutil.NewUniqueSet(replica.replicaPB.RwSqNodes...), + roSQNodes: typeutil.NewUniqueSet(replica.replicaPB.RoSqNodes...), }, exclusiveRWNodeToChannel: exclusiveRWNodeToChannel, } @@ -209,6 +272,30 @@ func (replica *mutableReplica) RemoveNode(nodes ...int64) { replica.tryBalanceNodeForChannel() } +// AddRWSQNode adds the node to rw sq nodes of the replica. +func (replica *mutableReplica) AddRWSQNode(nodes ...int64) { + replica.roSQNodes.Remove(nodes...) + replica.replicaPB.RoSqNodes = replica.roSQNodes.Collect() + replica.rwSQNodes.Insert(nodes...) + replica.replicaPB.RwSqNodes = replica.rwSQNodes.Collect() +} + +// AddROSQNode add the node to ro sq nodes of the replica. +func (replica *mutableReplica) AddROSQNode(nodes ...int64) { + replica.rwSQNodes.Remove(nodes...) + replica.replicaPB.RwSqNodes = replica.rwSQNodes.Collect() + replica.roSQNodes.Insert(nodes...) + replica.replicaPB.RoSqNodes = replica.roSQNodes.Collect() +} + +// RemoveSQNode removes the node from rw sq nodes and ro sq nodes of the replica. +func (replica *mutableReplica) RemoveSQNode(nodes ...int64) { + replica.rwSQNodes.Remove(nodes...) + replica.replicaPB.RwSqNodes = replica.rwSQNodes.Collect() + replica.roSQNodes.Remove(nodes...) + replica.replicaPB.RoSqNodes = replica.roSQNodes.Collect() +} + func (replica *mutableReplica) removeChannelExclusiveNodes(nodes ...int64) { channelNodeMap := make(map[string][]int64) for _, nodeID := range nodes { diff --git a/internal/querycoordv2/meta/replica_manager.go b/internal/querycoordv2/meta/replica_manager.go index f204320e969bd..5a8b22936c5a5 100644 --- a/internal/querycoordv2/meta/replica_manager.go +++ b/internal/querycoordv2/meta/replica_manager.go @@ -497,6 +497,21 @@ func (m *ReplicaManager) RemoveNode(ctx context.Context, replicaID typeutil.Uniq return m.put(ctx, mutableReplica.IntoReplica()) } +// RemoveSQNode removes the sq node from all replicas of given collection. +func (m *ReplicaManager) RemoveSQNode(ctx context.Context, replicaID typeutil.UniqueID, nodes ...typeutil.UniqueID) error { + m.rwmutex.Lock() + defer m.rwmutex.Unlock() + + replica, ok := m.replicas[replicaID] + if !ok { + return merr.WrapErrReplicaNotFound(replicaID) + } + + mutableReplica := replica.CopyForWrite() + mutableReplica.RemoveSQNode(nodes...) // ro -> unused + return m.put(ctx, mutableReplica.IntoReplica()) +} + func (m *ReplicaManager) GetResourceGroupByCollection(ctx context.Context, collection typeutil.UniqueID) typeutil.Set[string] { replicas := m.GetByCollection(ctx, collection) ret := typeutil.NewSet(lo.Map(replicas, func(r *Replica, _ int) string { return r.GetResourceGroup() })...) @@ -542,3 +557,48 @@ func (m *ReplicaManager) GetReplicasJSON(ctx context.Context, meta *Meta) string } return string(ret) } + +// RecoverSQNodesInCollection recovers all sq nodes in collection with latest node list. +// Promise a node will be only assigned to one replica in same collection at same time. +// 1. Move the rw nodes to ro nodes if current replica use too much sqn. +// 2. Add new incoming nodes into the replica if they are not ro node of other replicas in same collection. +// 3. replicas will shared the nodes in resource group fairly. +func (m *ReplicaManager) RecoverSQNodesInCollection(ctx context.Context, collectionID int64, sqnNodeIDs typeutil.UniqueSet) error { + m.rwmutex.Lock() + defer m.rwmutex.Unlock() + + collReplicas, ok := m.coll2Replicas[collectionID] + if !ok { + return errors.Errorf("collection %d not loaded", collectionID) + } + + helper := newReplicaSQNAssignmentHelper(collReplicas.replicas, sqnNodeIDs) + helper.updateExpectedNodeCountForReplicas(len(sqnNodeIDs)) + + modifiedReplicas := make([]*Replica, 0) + // recover node by given sqn node list. + helper.RangeOverReplicas(func(assignment *replicaAssignmentInfo) { + roNodes := assignment.GetNewRONodes() + recoverableNodes, incomingNodeCount := assignment.GetRecoverNodesAndIncomingNodeCount() + // There may be not enough incoming nodes for current replica, + // Even we filtering the nodes that are used by other replica of same collection in other resource group, + // current replica's expected node may be still used by other replica of same collection in same resource group. + incomingNode := helper.AllocateIncomingNodes(incomingNodeCount) + if len(roNodes) == 0 && len(recoverableNodes) == 0 && len(incomingNode) == 0 { + // nothing to do. + return + } + mutableReplica := m.replicas[assignment.GetReplicaID()].CopyForWrite() + mutableReplica.AddROSQNode(roNodes...) // rw -> ro + mutableReplica.AddRWSQNode(recoverableNodes...) // ro -> rw + mutableReplica.AddRWSQNode(incomingNode...) // unused -> rw + log.Info( + "new replica recovery streaming query node found", + zap.Int64("replicaID", assignment.GetReplicaID()), + zap.Int64s("newRONodes", roNodes), + zap.Int64s("roToRWNodes", recoverableNodes), + zap.Int64s("newIncomingNodes", incomingNode)) + modifiedReplicas = append(modifiedReplicas, mutableReplica.IntoReplica()) + }) + return m.put(ctx, modifiedReplicas...) +} diff --git a/internal/querycoordv2/meta/replica_manager_helper.go b/internal/querycoordv2/meta/replica_manager_helper.go index 1bf6f8e8fb90d..be280d09591d5 100644 --- a/internal/querycoordv2/meta/replica_manager_helper.go +++ b/internal/querycoordv2/meta/replica_manager_helper.go @@ -179,6 +179,40 @@ func newReplicaAssignmentInfo(replica *Replica, nodeInRG typeutil.UniqueSet) *re } } +func newReplicaSQNAssignmentInfo(replica *Replica, nodes typeutil.UniqueSet) *replicaAssignmentInfo { + // node in replica can be split into 3 part. + rwNodes := make(typeutil.UniqueSet, replica.RWSQNodesCount()) + newRONodes := make(typeutil.UniqueSet, replica.ROSQNodesCount()) + unrecoverableRONodes := make(typeutil.UniqueSet, replica.ROSQNodesCount()) + recoverableRONodes := make(typeutil.UniqueSet, replica.ROSQNodesCount()) + + replica.RangeOverRWSQNodes(func(nodeID int64) bool { + if nodes.Contain(nodeID) { + rwNodes.Insert(nodeID) + } else { + newRONodes.Insert(nodeID) + } + return true + }) + + replica.RangeOverROSQNodes(func(nodeID int64) bool { + if nodes.Contain(nodeID) { + recoverableRONodes.Insert(nodeID) + } else { + unrecoverableRONodes.Insert(nodeID) + } + return true + }) + return &replicaAssignmentInfo{ + replicaID: replica.GetID(), + expectedNodeCount: 0, + rwNodes: rwNodes, + newRONodes: newRONodes, + recoverableRONodes: recoverableRONodes, + unrecoverableRONodes: unrecoverableRONodes, + } +} + type replicaAssignmentInfo struct { replicaID typeutil.UniqueID expectedNodeCount int // expected node count for each replica. @@ -236,6 +270,11 @@ func (s *replicaAssignmentInfo) GetRecoverNodesAndIncomingNodeCount() (recoverNo return recoverNodes, incomingNodeCount } +// GetUnrecoverableNodes returns the unrecoverable ro nodes for these replica. +func (s *replicaAssignmentInfo) GetUnrecoverableNodes() []int64 { + return s.unrecoverableRONodes.Collect() +} + // RangeOverAllNodes iterate all nodes in replica. func (s *replicaAssignmentInfo) RangeOverAllNodes(f func(nodeID int64)) { ff := func(nodeID int64) bool { @@ -270,3 +309,32 @@ func (s replicaAssignmentInfoSortByAvailableAndRecoverable) Less(i, j int) bool // Otherwise unstable assignment may cause unnecessary node transfer. return left < right || (left == right && s.replicaAssignmentInfoSorter[i].replicaID < s.replicaAssignmentInfoSorter[j].replicaID) } + +// newReplicaSQNAssignmentHelper creates a new replicaSQNAssignmentHelper. +func newReplicaSQNAssignmentHelper( + replicas []*Replica, + nodes typeutil.UniqueSet, +) *replicasInSameRGAssignmentHelper { + // We use a fake resource group name to create a helper. + assignmentInfos := make([]*replicaAssignmentInfo, 0, len(replicas)) + for _, replica := range replicas { + assignmentInfos = append(assignmentInfos, newReplicaSQNAssignmentInfo(replica, nodes)) + } + h := &replicasInSameRGAssignmentHelper{ + rgName: "", + nodesInRG: nodes, + incomingNodes: nodes.Clone(), + replicas: assignmentInfos, + } + // generate incoming nodes for collection. + h.RangeOverReplicas(func(assignment *replicaAssignmentInfo) { + assignment.RangeOverAllNodes(func(nodeID int64) { + if nodes.Contain(nodeID) { + h.incomingNodes.Remove(nodeID) + } + }) + }) + // update expected node count for all replicas in same resource group. + h.updateExpectedNodeCountForReplicas(len(nodes)) + return h +} diff --git a/internal/querycoordv2/meta/replica_manager_test.go b/internal/querycoordv2/meta/replica_manager_test.go index 2a97ada097a47..1a1fa74853dae 100644 --- a/internal/querycoordv2/meta/replica_manager_test.go +++ b/internal/querycoordv2/meta/replica_manager_test.go @@ -154,7 +154,7 @@ func (suite *ReplicaManagerSuite) TestGet() { suite.Equal(collectionID, replica.GetCollectionID()) suite.Equal(replica, mgr.Get(ctx, replica.GetID())) suite.Equal(len(replica.replicaPB.GetNodes()), replica.RWNodesCount()) - suite.Equal(replica.replicaPB.GetNodes(), replica.GetNodes()) + suite.ElementsMatch(replica.replicaPB.GetNodes(), replica.GetNodes()) replicaNodes[replica.GetID()] = replica.GetNodes() nodes = append(nodes, replica.GetNodes()...) } @@ -221,7 +221,7 @@ func (suite *ReplicaManagerSuite) TestRecover() { replica := mgr.Get(ctx, 2100) suite.NotNil(replica) suite.EqualValues(1000, replica.GetCollectionID()) - suite.EqualValues([]int64{1, 2, 3}, replica.GetNodes()) + suite.ElementsMatch([]int64{1, 2, 3}, replica.GetNodes()) suite.Len(replica.GetNodes(), len(replica.GetNodes())) for _, node := range replica.GetNodes() { suite.True(replica.Contains(node)) @@ -332,12 +332,14 @@ func (suite *ReplicaManagerSuite) clearMemory() { type ReplicaManagerV2Suite struct { suite.Suite - rgs map[string]typeutil.UniqueSet - collections map[int64]collectionLoadConfig - kv kv.MetaKv - catalog metastore.QueryCoordCatalog - mgr *ReplicaManager - ctx context.Context + rgs map[string]typeutil.UniqueSet + sqNodes typeutil.UniqueSet + outboundSQNodes []int64 + collections map[int64]collectionLoadConfig + kv kv.MetaKv + catalog metastore.QueryCoordCatalog + mgr *ReplicaManager + ctx context.Context } func (suite *ReplicaManagerV2Suite) SetupSuite() { @@ -350,6 +352,8 @@ func (suite *ReplicaManagerV2Suite) SetupSuite() { "RG4": typeutil.NewUniqueSet(7, 8, 9, 10), "RG5": typeutil.NewUniqueSet(11, 12, 13, 14, 15), } + suite.sqNodes = typeutil.NewUniqueSet(16, 17, 18, 19, 20) + suite.outboundSQNodes = []int64{} suite.collections = map[int64]collectionLoadConfig{ 1000: { spawnConfig: map[string]int{"RG1": 1}, @@ -406,6 +410,7 @@ func (suite *ReplicaManagerV2Suite) TestSpawn() { rgsOfCollection[rg] = suite.rgs[rg] } mgr.RecoverNodesInCollection(ctx, id, rgsOfCollection) + mgr.RecoverSQNodesInCollection(ctx, id, suite.sqNodes) for rg := range cfg.spawnConfig { for _, node := range suite.rgs[rg].Collect() { replica := mgr.GetByCollectionAndNode(ctx, id, node) @@ -428,6 +433,10 @@ func (suite *ReplicaManagerV2Suite) testIfBalanced() { for _, r := range replicas { rgToReplica[r.GetResourceGroup()] = append(rgToReplica[r.GetResourceGroup()], r) } + + maximumSQNodes := -1 + minimumSQNodes := -1 + sqNodes := make([]int64, 0) for _, replicas := range rgToReplica { maximumNodes := -1 minimumNodes := -1 @@ -440,7 +449,15 @@ func (suite *ReplicaManagerV2Suite) testIfBalanced() { if minimumNodes == -1 || r.RWNodesCount() < minimumNodes { minimumNodes = r.RWNodesCount() } - nodes = append(nodes, r.GetNodes()...) + if maximumSQNodes == -1 || r.RWSQNodesCount() > maximumSQNodes { + maximumSQNodes = r.RWSQNodesCount() + } + if minimumSQNodes == -1 || r.RWSQNodesCount() < minimumSQNodes { + minimumSQNodes = r.RWSQNodesCount() + } + nodes = append(nodes, r.GetRWNodes()...) + nodes = append(nodes, r.GetRONodes()...) + sqNodes = append(sqNodes, r.GetRWSQNodes()...) r.RangeOverRONodes(func(node int64) bool { if availableNodes.Contain(node) { nodes = append(nodes, node) @@ -451,6 +468,10 @@ func (suite *ReplicaManagerV2Suite) testIfBalanced() { suite.ElementsMatch(nodes, suite.rgs[replicas[0].GetResourceGroup()].Collect()) suite.True(maximumNodes-minimumNodes <= 1) } + availableSQNodes := suite.sqNodes.Clone() + availableSQNodes.Remove(suite.outboundSQNodes...) + suite.ElementsMatch(availableSQNodes.Collect(), sqNodes) + suite.True(maximumSQNodes-minimumSQNodes <= 1) } } @@ -475,6 +496,7 @@ func (suite *ReplicaManagerV2Suite) TestTransferReplicaAndAddNode() { suite.mgr.TransferReplica(ctx, 1005, "RG4", "RG5", 1) suite.recoverReplica(1, false) suite.rgs["RG5"].Insert(16, 17, 18) + suite.sqNodes.Insert(20, 21, 22) suite.recoverReplica(2, true) suite.testIfBalanced() } @@ -482,6 +504,7 @@ func (suite *ReplicaManagerV2Suite) TestTransferReplicaAndAddNode() { func (suite *ReplicaManagerV2Suite) TestTransferNode() { suite.rgs["RG4"].Remove(7) suite.rgs["RG5"].Insert(7) + suite.outboundSQNodes = []int64{16, 17, 18} suite.recoverReplica(2, true) suite.testIfBalanced() } @@ -497,7 +520,10 @@ func (suite *ReplicaManagerV2Suite) recoverReplica(k int, clearOutbound bool) { for rg := range cfg.spawnConfig { rgsOfCollection[rg] = suite.rgs[rg] } + sqNodes := suite.sqNodes.Clone() + sqNodes.Remove(suite.outboundSQNodes...) suite.mgr.RecoverNodesInCollection(ctx, id, rgsOfCollection) + suite.mgr.RecoverSQNodesInCollection(ctx, id, sqNodes) } // clear all outbound nodes @@ -507,6 +533,7 @@ func (suite *ReplicaManagerV2Suite) recoverReplica(k int, clearOutbound bool) { for _, r := range replicas { outboundNodes := r.GetRONodes() suite.mgr.RemoveNode(ctx, r.GetID(), outboundNodes...) + suite.mgr.RemoveSQNode(ctx, r.GetID(), r.GetROSQNodes()...) } } } diff --git a/internal/querycoordv2/meta/replica_test.go b/internal/querycoordv2/meta/replica_test.go index 2e3804d189c62..7f4cbdc5479f5 100644 --- a/internal/querycoordv2/meta/replica_test.go +++ b/internal/querycoordv2/meta/replica_test.go @@ -26,6 +26,55 @@ func (suite *ReplicaSuite) SetupSuite() { } } +func (suite *ReplicaSuite) TestSNNodes() { + replicaPB := &querypb.Replica{ + ID: 1, + CollectionID: 2, + Nodes: []int64{1, 2, 3}, + ResourceGroup: DefaultResourceGroupName, + RoNodes: []int64{4}, + RwSqNodes: []int64{6, 7, 8, 2}, + RoSqNodes: []int64{5}, + } + r := newReplica(replicaPB) + suite.Len(r.GetNodes(), 8) + suite.Len(r.GetROSQNodes(), r.ROSQNodesCount()) + suite.Len(r.GetRWSQNodes(), r.RWSQNodesCount()) + cnt := 0 + r.RangeOverRWSQNodes(func(nodeID int64) bool { + cnt++ + return true + }) + suite.Equal(r.RWSQNodesCount(), cnt) + + cnt = 0 + r.RangeOverROSQNodes(func(nodeID int64) bool { + cnt++ + return true + }) + suite.Equal(r.RONodesCount(), cnt) + + suite.Len(r.GetChannelRWNodes("channel1"), 0) + + copiedR := r.CopyForWrite() + copiedR.AddRWSQNode(9, 5) + r2 := copiedR.IntoReplica() + suite.Equal(6, r2.RWSQNodesCount()) + suite.Equal(0, r2.ROSQNodesCount()) + + copiedR = r.CopyForWrite() + copiedR.AddROSQNode(7, 8) + r2 = copiedR.IntoReplica() + suite.Equal(2, r2.RWSQNodesCount()) + suite.Equal(3, r2.ROSQNodesCount()) + + copiedR = r.CopyForWrite() + copiedR.RemoveSQNode(5, 8) + r2 = copiedR.IntoReplica() + suite.Equal(3, r2.RWSQNodesCount()) + suite.Equal(0, r2.ROSQNodesCount()) +} + func (suite *ReplicaSuite) TestReadOperations() { r := newReplica(suite.replicaPB) suite.testRead(r) diff --git a/internal/querycoordv2/observers/replica_observer.go b/internal/querycoordv2/observers/replica_observer.go index ae0be66efc693..c330ce1ee93dd 100644 --- a/internal/querycoordv2/observers/replica_observer.go +++ b/internal/querycoordv2/observers/replica_observer.go @@ -23,11 +23,14 @@ import ( "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/coordinator/snmanager" "github.com/milvus-io/milvus/internal/querycoordv2/meta" "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/querycoordv2/utils" + "github.com/milvus-io/milvus/internal/util/streamingutil" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/syncutil" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) // check replica, find read only nodes and remove it from replica if all segment/channel has been moved @@ -55,6 +58,10 @@ func (ob *ReplicaObserver) Start() { ob.wg.Add(1) go ob.schedule(ctx) + if streamingutil.IsStreamingServiceEnabled() { + ob.wg.Add(1) + go ob.scheduleStreamingQN(ctx) + } }) } @@ -85,12 +92,74 @@ func (ob *ReplicaObserver) schedule(ctx context.Context) { } } +// scheduleStreamingQN is used to check streaming query node in replica +func (ob *ReplicaObserver) scheduleStreamingQN(ctx context.Context) { + defer ob.wg.Done() + log.Info("Start streaming query node check replica loop") + + listener := snmanager.StaticStreamingNodeManager.ListenNodeChanged() + for { + ob.waitNodeChangedOrTimeout(ctx, listener) + if ctx.Err() != nil { + log.Info("Stop streaming query node check replica observer") + return + } + + ids := snmanager.StaticStreamingNodeManager.GetStreamingQueryNodeIDs() + ob.checkStreamingQueryNodesInReplica(ids) + } +} + func (ob *ReplicaObserver) waitNodeChangedOrTimeout(ctx context.Context, listener *syncutil.VersionedListener) { ctxWithTimeout, cancel := context.WithTimeout(ctx, params.Params.QueryCoordCfg.CheckNodeInReplicaInterval.GetAsDuration(time.Second)) defer cancel() listener.Wait(ctxWithTimeout) } +func (ob *ReplicaObserver) checkStreamingQueryNodesInReplica(sqNodeIDs typeutil.UniqueSet) { + ctx := context.Background() + log := log.Ctx(ctx).WithRateGroup("qcv2.replicaObserver", 1, 60) + collections := ob.meta.GetAll(context.Background()) + + for _, collectionID := range collections { + ob.meta.RecoverSQNodesInCollection(context.Background(), collectionID, sqNodeIDs) + } + + for _, collectionID := range collections { + replicas := ob.meta.ReplicaManager.GetByCollection(ctx, collectionID) + for _, replica := range replicas { + roSQNodes := replica.GetROSQNodes() + rwSQNodes := replica.GetRWSQNodes() + if len(roSQNodes) == 0 { + continue + } + removeNodes := make([]int64, 0, len(roSQNodes)) + for _, node := range roSQNodes { + channels := ob.distMgr.ChannelDistManager.GetByCollectionAndFilter(replica.GetCollectionID(), meta.WithNodeID2Channel(node)) + segments := ob.distMgr.SegmentDistManager.GetByFilter(meta.WithCollectionID(collectionID), meta.WithNodeID(node)) + if len(channels) == 0 && len(segments) == 0 { + removeNodes = append(removeNodes, node) + } + } + if len(removeNodes) == 0 { + continue + } + logger := log.With( + zap.Int64("collectionID", replica.GetCollectionID()), + zap.Int64("replicaID", replica.GetID()), + zap.Int64s("removedNodes", removeNodes), + zap.Int64s("roNodes", roSQNodes), + zap.Int64s("rwNodes", rwSQNodes), + ) + if err := ob.meta.ReplicaManager.RemoveSQNode(ctx, replica.GetID(), removeNodes...); err != nil { + logger.Warn("fail to remove streaming query node from replica", zap.Error(err)) + continue + } + logger.Info("all segment/channel has been removed from ro streaming query node, remove it from replica") + } + } +} + func (ob *ReplicaObserver) checkNodesInReplica() { ctx := context.Background() log := log.Ctx(ctx).WithRateGroup("qcv2.replicaObserver", 1, 60) @@ -135,7 +204,7 @@ func (ob *ReplicaObserver) checkNodesInReplica() { logger.Warn("fail to remove node from replica", zap.Error(err)) continue } - logger.Info("all segment/channel has been removed from ro node, try to remove it from replica") + logger.Info("all segment/channel has been removed from ro node, remove it from replica") } } } diff --git a/internal/querycoordv2/observers/replica_observer_test.go b/internal/querycoordv2/observers/replica_observer_test.go index 266d731a00d22..a619cdd06ce82 100644 --- a/internal/querycoordv2/observers/replica_observer_test.go +++ b/internal/querycoordv2/observers/replica_observer_test.go @@ -20,16 +20,21 @@ import ( "testing" "time" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" "github.com/milvus-io/milvus-proto/go-api/v2/rgpb" + "github.com/milvus-io/milvus/internal/coordinator/snmanager" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/metastore/kv/querycoord" + "github.com/milvus-io/milvus/internal/mocks/streamingcoord/server/mock_balancer" "github.com/milvus-io/milvus/internal/querycoordv2/meta" . "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/utils" + "github.com/milvus-io/milvus/internal/util/streamingutil" "github.com/milvus-io/milvus/pkg/kv" + "github.com/milvus-io/milvus/pkg/streaming/util/types" "github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -52,6 +57,7 @@ type ReplicaObserverSuite struct { } func (suite *ReplicaObserverSuite) SetupSuite() { + streamingutil.SetStreamingServiceEnabled() paramtable.Init() paramtable.Get().Save(Params.QueryCoordCfg.CheckNodeInReplicaInterval.Key, "1") } @@ -196,9 +202,105 @@ func (suite *ReplicaObserverSuite) TestCheckNodesInReplica() { }, 30*time.Second, 2*time.Second) } +func (suite *ReplicaObserverSuite) TestCheckSQnodesInReplica() { + balancer := mock_balancer.NewMockBalancer(suite.T()) + snmanager.StaticStreamingNodeManager.SetBalancerReady(balancer) + + change := make(chan struct{}) + balancer.EXPECT().WatchChannelAssignments(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, cb func(typeutil.VersionInt64Pair, []types.PChannelInfoAssigned) error) error { + versions := []typeutil.VersionInt64Pair{ + {Global: 1, Local: 2}, + {Global: 1, Local: 3}, + } + pchans := [][]types.PChannelInfoAssigned{ + { + types.PChannelInfoAssigned{ + Channel: types.PChannelInfo{Name: "pchannel", Term: 1}, + Node: types.StreamingNodeInfo{ServerID: 1, Address: "localhost:1"}, + }, + types.PChannelInfoAssigned{ + Channel: types.PChannelInfo{Name: "pchannel2", Term: 1}, + Node: types.StreamingNodeInfo{ServerID: 2, Address: "localhost:1"}, + }, + types.PChannelInfoAssigned{ + Channel: types.PChannelInfo{Name: "pchannel3", Term: 1}, + Node: types.StreamingNodeInfo{ServerID: 3, Address: "localhost:1"}, + }, + }, + { + types.PChannelInfoAssigned{ + Channel: types.PChannelInfo{Name: "pchannel", Term: 1}, + Node: types.StreamingNodeInfo{ServerID: 1, Address: "localhost:1"}, + }, + types.PChannelInfoAssigned{ + Channel: types.PChannelInfo{Name: "pchannel2", Term: 1}, + Node: types.StreamingNodeInfo{ServerID: 2, Address: "localhost:1"}, + }, + types.PChannelInfoAssigned{ + Channel: types.PChannelInfo{Name: "pchannel3", Term: 2}, + Node: types.StreamingNodeInfo{ServerID: 2, Address: "localhost:1"}, + }, + }, + } + for i := 0; i < len(versions); i++ { + cb(versions[i], pchans[i]) + <-change + } + <-ctx.Done() + return context.Cause(ctx) + }) + + ctx := context.Background() + err := suite.meta.CollectionManager.PutCollection(ctx, utils.CreateTestCollection(suite.collectionID, 2)) + suite.NoError(err) + replicas, err := suite.meta.Spawn(ctx, suite.collectionID, map[string]int{ + "rg1": 1, + "rg2": 1, + }, nil) + suite.NoError(err) + suite.Equal(2, len(replicas)) + + suite.Eventually(func() bool { + replica := suite.meta.ReplicaManager.GetByCollection(ctx, suite.collectionID) + total := 0 + for _, r := range replica { + total += r.RWSQNodesCount() + } + return total == 3 + }, 6*time.Second, 2*time.Second) + replica := suite.meta.ReplicaManager.GetByCollection(ctx, suite.collectionID) + nodes := typeutil.NewUniqueSet() + for _, r := range replica { + suite.LessOrEqual(r.RWSQNodesCount(), 2) + suite.Equal(r.ROSQNodesCount(), 0) + nodes.Insert(r.GetRWSQNodes()...) + } + suite.Equal(nodes.Len(), 3) + + close(change) + + suite.Eventually(func() bool { + replica := suite.meta.ReplicaManager.GetByCollection(ctx, suite.collectionID) + total := 0 + for _, r := range replica { + total += r.RWSQNodesCount() + } + return total == 2 + }, 6*time.Second, 2*time.Second) + replica = suite.meta.ReplicaManager.GetByCollection(ctx, suite.collectionID) + nodes = typeutil.NewUniqueSet() + for _, r := range replica { + suite.Equal(r.RWSQNodesCount(), 1) + suite.Equal(r.ROSQNodesCount(), 0) + nodes.Insert(r.GetRWSQNodes()...) + } + suite.Equal(nodes.Len(), 2) +} + func (suite *ReplicaObserverSuite) TearDownSuite() { suite.kv.Close() suite.observer.Stop() + streamingutil.UnsetStreamingServiceEnabled() } func TestReplicaObserver(t *testing.T) { diff --git a/internal/querycoordv2/ops_services.go b/internal/querycoordv2/ops_services.go index 135bf481d1b88..8c8ae76479f25 100644 --- a/internal/querycoordv2/ops_services.go +++ b/internal/querycoordv2/ops_services.go @@ -24,9 +24,11 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/internal/coordinator/snmanager" "github.com/milvus-io/milvus/internal/querycoordv2/meta" "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/utils" + "github.com/milvus-io/milvus/internal/util/streamingutil" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/proto/querypb" "github.com/milvus-io/milvus/pkg/util/merr" @@ -227,12 +229,18 @@ func (s *Server) ResumeNode(ctx context.Context, req *querypb.ResumeNodeRequest) return merr.Status(errors.Wrap(err, errMsg)), nil } - if s.nodeMgr.Get(req.GetNodeID()) == nil { + info := s.nodeMgr.Get(req.GetNodeID()) + if info == nil { err := merr.WrapErrNodeNotFound(req.GetNodeID(), errMsg) log.Warn(errMsg, zap.Error(err)) return merr.Status(err), nil } + if info.IsEmbeddedQueryNodeInStreamingNode() { + return merr.Status( + merr.WrapErrParameterInvalidMsg("embedded query node in streaming node can't be resumed")), nil + } + s.meta.ResourceManager.HandleNodeUp(ctx, req.GetNodeID()) return merr.Success(), nil @@ -274,6 +282,13 @@ func (s *Server) TransferSegment(ctx context.Context, req *querypb.TransferSegme err := merr.WrapErrNodeNotAvailable(srcNode, "the target node is invalid") return merr.Status(err), nil } + if streamingutil.IsStreamingServiceEnabled() { + sqn := snmanager.StaticStreamingNodeManager.GetStreamingQueryNodeIDs() + if sqn.Contain(req.GetTargetNodeID()) { + return merr.Status( + merr.WrapErrParameterInvalidMsg("embedded query node in streaming node can't be the destination of transfer segment")), nil + } + } dstNodeSet.Insert(req.GetTargetNodeID()) } dstNodeSet.Remove(srcNode) @@ -339,7 +354,11 @@ func (s *Server) TransferChannel(ctx context.Context, req *querypb.TransferChann // when no dst node specified, default to use all other nodes in same dstNodeSet := typeutil.NewUniqueSet() if req.GetToAllNodes() { - dstNodeSet.Insert(replica.GetRWNodes()...) + if streamingutil.IsStreamingServiceEnabled() { + dstNodeSet.Insert(replica.GetRWSQNodes()...) + } else { + dstNodeSet.Insert(replica.GetRWNodes()...) + } } else { // check whether dstNode is healthy if err := s.isStoppingNode(ctx, req.GetTargetNodeID()); err != nil { diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index 082a921bd3aa7..c99a9bf696b93 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -859,8 +859,16 @@ func (s *Server) tryHandleNodeUp() { } func (s *Server) handleNodeUp(node int64) { + nodeInfo := s.nodeMgr.Get(node) + if nodeInfo == nil { + return + } s.taskScheduler.AddExecutor(node) s.distController.StartDistInstance(s.ctx, node) + if nodeInfo.IsEmbeddedQueryNodeInStreamingNode() { + // The querynode embedded in the streaming node can not work with streaming node. + return + } // need assign to new rg and replica s.meta.ResourceManager.HandleNodeUp(s.ctx, node) } diff --git a/internal/querycoordv2/session/node_manager.go b/internal/querycoordv2/session/node_manager.go index fafd16d6b3b43..8d5bf6e0239f1 100644 --- a/internal/querycoordv2/session/node_manager.go +++ b/internal/querycoordv2/session/node_manager.go @@ -24,6 +24,7 @@ import ( "github.com/blang/semver/v4" "go.uber.org/atomic" + "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/pkg/metrics" ) @@ -152,6 +153,10 @@ func (n *NodeInfo) Labels() map[string]string { return n.immutableInfo.Labels } +func (n *NodeInfo) IsEmbeddedQueryNodeInStreamingNode() bool { + return n.immutableInfo.Labels[sessionutil.LabelStreamingNodeEmbeddedQueryNode] == "1" +} + func (n *NodeInfo) SegmentCnt() int { n.mu.RLock() defer n.mu.RUnlock() diff --git a/internal/querycoordv2/utils/meta.go b/internal/querycoordv2/utils/meta.go index fe15c685bbf0c..9f47b801bb62e 100644 --- a/internal/querycoordv2/utils/meta.go +++ b/internal/querycoordv2/utils/meta.go @@ -18,13 +18,16 @@ package utils import ( "context" + "fmt" "strings" "github.com/cockroachdb/errors" "github.com/samber/lo" "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/coordinator/snmanager" "github.com/milvus-io/milvus/internal/querycoordv2/meta" + "github.com/milvus-io/milvus/internal/util/streamingutil" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -109,6 +112,13 @@ func AssignReplica(ctx context.Context, m *meta.Meta, resourceGroups []string, r "replica=[%d] resource group=[%s], resource group num can only be 0, 1 or same as replica number", replicaNumber, strings.Join(resourceGroups, ",")) } + if streamingutil.IsStreamingServiceEnabled() { + streamingNodeCount := snmanager.StaticStreamingNodeManager.GetStreamingQueryNodeIDs().Len() + if replicaNumber > int32(streamingNodeCount) { + return nil, merr.WrapErrStreamingNodeNotEnough(streamingNodeCount, int(replicaNumber), fmt.Sprintf("when load %d replica count", replicaNumber)) + } + } + replicaNumInRG := make(map[string]int) if len(resourceGroups) == 0 { // All replicas should be spawned in default resource group. @@ -160,6 +170,9 @@ func SpawnReplicasWithRG(ctx context.Context, m *meta.Meta, collection int64, re } // Active recover it. RecoverReplicaOfCollection(ctx, m, collection) + if streamingutil.IsStreamingServiceEnabled() { + m.RecoverSQNodesInCollection(ctx, collection, snmanager.StaticStreamingNodeManager.GetStreamingQueryNodeIDs()) + } return replicas, nil } diff --git a/internal/streamingcoord/server/balancer/balancer.go b/internal/streamingcoord/server/balancer/balancer.go index abe35d51ec6ae..98d2bd85bc141 100644 --- a/internal/streamingcoord/server/balancer/balancer.go +++ b/internal/streamingcoord/server/balancer/balancer.go @@ -3,11 +3,16 @@ package balancer import ( "context" + "github.com/cockroachdb/errors" + "github.com/milvus-io/milvus/pkg/streaming/util/types" "github.com/milvus-io/milvus/pkg/util/typeutil" ) -var _ Balancer = (*balancerImpl)(nil) +var ( + _ Balancer = (*balancerImpl)(nil) + ErrBalancerClosed = errors.New("balancer is closed") +) // Balancer is a load balancer to balance the load of log node. // Given the balance result to assign or remove channels to corresponding log node. diff --git a/internal/streamingcoord/server/balancer/balancer_impl.go b/internal/streamingcoord/server/balancer/balancer_impl.go index 7a263a210054f..e62250e85188a 100644 --- a/internal/streamingcoord/server/balancer/balancer_impl.go +++ b/internal/streamingcoord/server/balancer/balancer_impl.go @@ -13,6 +13,7 @@ import ( "github.com/milvus-io/milvus/internal/util/streamingutil/status" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/streaming/util/types" + "github.com/milvus-io/milvus/pkg/util/contextutil" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/syncutil" "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -30,7 +31,10 @@ func RecoverBalancer( if err != nil { return nil, errors.Wrap(err, "fail to recover channel manager") } + ctx, cancel := context.WithCancelCause(context.Background()) b := &balancerImpl{ + ctx: ctx, + cancel: cancel, lifetime: typeutil.NewLifetime(), logger: resource.Resource().Logger().With(log.FieldComponent("balancer"), zap.String("policy", policy)), channelMetaManager: manager, @@ -44,6 +48,8 @@ func RecoverBalancer( // balancerImpl is a implementation of Balancer. type balancerImpl struct { + ctx context.Context + cancel context.CancelCauseFunc lifetime *typeutil.Lifetime logger *log.MLogger channelMetaManager *channel.ChannelManager @@ -58,6 +64,8 @@ func (b *balancerImpl) WatchChannelAssignments(ctx context.Context, cb func(vers return status.NewOnShutdownError("balancer is closing") } defer b.lifetime.Done() + + ctx, _ = contextutil.MergeContext(ctx, b.ctx) return b.channelMetaManager.WatchAssignmentResult(ctx, cb) } @@ -93,6 +101,8 @@ func (b *balancerImpl) sendRequestAndWaitFinish(ctx context.Context, newReq *req // Close close the balancer. func (b *balancerImpl) Close() { b.lifetime.SetState(typeutil.LifetimeStateStopped) + // cancel all watch opeartion by context. + b.cancel(ErrBalancerClosed) b.lifetime.Wait() b.backgroundTaskNotifier.Cancel() diff --git a/internal/streamingcoord/server/balancer/balancer_test.go b/internal/streamingcoord/server/balancer/balancer_test.go index b794527ca7ff6..427d993f50f44 100644 --- a/internal/streamingcoord/server/balancer/balancer_test.go +++ b/internal/streamingcoord/server/balancer/balancer_test.go @@ -3,6 +3,7 @@ package balancer_test import ( "context" "testing" + "time" "github.com/cockroachdb/errors" "github.com/stretchr/testify/assert" @@ -16,6 +17,7 @@ import ( "github.com/milvus-io/milvus/pkg/proto/streamingpb" "github.com/milvus-io/milvus/pkg/streaming/util/types" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/syncutil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -91,7 +93,6 @@ func TestBalancer(t *testing.T) { b, err := balancer.RecoverBalancer(ctx, "pchannel_count_fair") assert.NoError(t, err) assert.NotNil(t, b) - defer b.Close() b.MarkAsUnavailable(ctx, []types.PChannelInfo{{ Name: "test-channel-1", @@ -113,4 +114,18 @@ func TestBalancer(t *testing.T) { return nil }) assert.ErrorIs(t, err, doneErr) + + // create a inifite block watcher and can be interrupted by close of balancer. + f := syncutil.NewFuture[error]() + go func() { + err := b.WatchChannelAssignments(context.Background(), func(version typeutil.VersionInt64Pair, relations []types.PChannelInfoAssigned) error { + return nil + }) + f.Set(err) + }() + time.Sleep(20 * time.Millisecond) + assert.False(t, f.Ready()) + + b.Close() + assert.ErrorIs(t, f.Get(), balancer.ErrBalancerClosed) } diff --git a/internal/streamingcoord/server/server.go b/internal/streamingcoord/server/server.go index cb0a7e65e85ba..aacf680252530 100644 --- a/internal/streamingcoord/server/server.go +++ b/internal/streamingcoord/server/server.go @@ -6,6 +6,7 @@ import ( "go.uber.org/zap" "google.golang.org/grpc" + "github.com/milvus-io/milvus/internal/coordinator/snmanager" "github.com/milvus-io/milvus/internal/streamingcoord/server/balancer" _ "github.com/milvus-io/milvus/internal/streamingcoord/server/balancer/policy" // register the balancer policy "github.com/milvus-io/milvus/internal/streamingcoord/server/broadcaster" @@ -62,6 +63,7 @@ func (s *Server) initBasicComponent(ctx context.Context) (err error) { return struct{}{}, err } s.balancer.Set(balancer) + snmanager.StaticStreamingNodeManager.SetBalancerReady(balancer) s.logger.Info("recover balancer done") return struct{}{}, nil })) diff --git a/internal/streamingnode/server/flusher/flusherimpl/channel_lifetime.go b/internal/streamingnode/server/flusher/flusherimpl/channel_lifetime.go index a1d96f32bcffb..fb62c511565de 100644 --- a/internal/streamingnode/server/flusher/flusherimpl/channel_lifetime.go +++ b/internal/streamingnode/server/flusher/flusherimpl/channel_lifetime.go @@ -141,7 +141,7 @@ func (c *channelLifetime) Run() error { func() { go func() { c.Cancel() }() }, ) if err != nil { - handler.Close() + scanner.Close() return err } ds.Start() diff --git a/internal/util/sessionutil/session_util.go b/internal/util/sessionutil/session_util.go index 38ceaa8b73ff3..d7b1830d4d3e2 100644 --- a/internal/util/sessionutil/session_util.go +++ b/internal/util/sessionutil/session_util.go @@ -48,8 +48,9 @@ const ( // DefaultServiceRoot default root path used in kv by Session DefaultServiceRoot = "session/" // DefaultIDKey default id key for Session - DefaultIDKey = "id" - SupportedLabelPrefix = "MILVUS_SERVER_LABEL_" + DefaultIDKey = "id" + SupportedLabelPrefix = "MILVUS_SERVER_LABEL_" + LabelStreamingNodeEmbeddedQueryNode = "QUERYNODE_STREAMING-EMBEDDED" ) // SessionEventType session event type diff --git a/internal/util/streamingutil/env.go b/internal/util/streamingutil/env.go index 8c81c685fc1bf..6a160f42d02ed 100644 --- a/internal/util/streamingutil/env.go +++ b/internal/util/streamingutil/env.go @@ -1,6 +1,10 @@ package streamingutil -import "os" +import ( + "os" + + "github.com/milvus-io/milvus/internal/util/sessionutil" +) const MilvusStreamingServiceEnabled = "MILVUS_STREAMING_SERVICE_ENABLED" @@ -16,3 +20,14 @@ func MustEnableStreamingService() { panic("start a streaming node without enabling streaming service, please set environment variable MILVUS_STREAMING_SERVICE_ENABLED = 1") } } + +// EnableEmbededQueryNode set server labels for embedded query node. +func EnableEmbededQueryNode() { + MustEnableStreamingService() + os.Setenv(sessionutil.SupportedLabelPrefix+sessionutil.LabelStreamingNodeEmbeddedQueryNode, "1") +} + +// IsEmbeddedQueryNode returns whether the current node is an embedded query node in streaming node. +func IsEmbeddedQueryNode() bool { + return os.Getenv(sessionutil.SupportedLabelPrefix+sessionutil.LabelStreamingNodeEmbeddedQueryNode) == "1" +} diff --git a/pkg/proto/query_coord.proto b/pkg/proto/query_coord.proto index e84ecf51584b6..d5fe974ee2c99 100644 --- a/pkg/proto/query_coord.proto +++ b/pkg/proto/query_coord.proto @@ -689,11 +689,18 @@ message ChannelNodeInfo { message Replica { int64 ID = 1; int64 collectionID = 2; + // nodes and ro_nodes can only load sealed segment. + // only manage the legacy querynode that not embedded in the streamingnode. repeated int64 nodes = 3; // all (read and write) nodes. mutual exclusive with ro_nodes. string resource_group = 4; repeated int64 ro_nodes = 5; // the in-using node but should not be assigned to these replica. - // can not load new channel or segment on it anymore. - map channel_node_infos = 6; + // cannot load segment on it anymore. + map channel_node_infos = 6; + // rw_sq_nodes and ro_sq_nodes can only watch channel and assign segment, will be removed in 3.0. + // only manage the querynode embedded in the streamingnode. + repeated int64 rw_sq_nodes = 7; // all (read and write) nodes. mutual exclusive with ro_sq_nodes. + repeated int64 ro_sq_nodes = 8; // the in-using node but should not be assigned to these replica. + // cannot watch channel on it anymore. } enum SyncType { diff --git a/pkg/proto/querypb/query_coord.pb.go b/pkg/proto/querypb/query_coord.pb.go index 8ce9c2d41034b..85226cb872730 100644 --- a/pkg/proto/querypb/query_coord.pb.go +++ b/pkg/proto/querypb/query_coord.pb.go @@ -4710,13 +4710,19 @@ type Replica struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - ID int64 `protobuf:"varint,1,opt,name=ID,proto3" json:"ID,omitempty"` - CollectionID int64 `protobuf:"varint,2,opt,name=collectionID,proto3" json:"collectionID,omitempty"` + ID int64 `protobuf:"varint,1,opt,name=ID,proto3" json:"ID,omitempty"` + CollectionID int64 `protobuf:"varint,2,opt,name=collectionID,proto3" json:"collectionID,omitempty"` + // nodes and ro_nodes can only load sealed segment. + // only manage the legacy querynode that not embedded in the streamingnode. Nodes []int64 `protobuf:"varint,3,rep,packed,name=nodes,proto3" json:"nodes,omitempty"` // all (read and write) nodes. mutual exclusive with ro_nodes. ResourceGroup string `protobuf:"bytes,4,opt,name=resource_group,json=resourceGroup,proto3" json:"resource_group,omitempty"` RoNodes []int64 `protobuf:"varint,5,rep,packed,name=ro_nodes,json=roNodes,proto3" json:"ro_nodes,omitempty"` // the in-using node but should not be assigned to these replica. - // can not load new channel or segment on it anymore. + // cannot load segment on it anymore. ChannelNodeInfos map[string]*ChannelNodeInfo `protobuf:"bytes,6,rep,name=channel_node_infos,json=channelNodeInfos,proto3" json:"channel_node_infos,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + // rw_sq_nodes and ro_sq_nodes can only watch channel and assign segment, will be removed in 3.0. + // only manage the querynode embedded in the streamingnode. + RwSqNodes []int64 `protobuf:"varint,7,rep,packed,name=rw_sq_nodes,json=rwSqNodes,proto3" json:"rw_sq_nodes,omitempty"` // all (read and write) nodes. mutual exclusive with ro_sq_nodes. + RoSqNodes []int64 `protobuf:"varint,8,rep,packed,name=ro_sq_nodes,json=roSqNodes,proto3" json:"ro_sq_nodes,omitempty"` // the in-using node but should not be assigned to these replica. } func (x *Replica) Reset() { @@ -4793,6 +4799,20 @@ func (x *Replica) GetChannelNodeInfos() map[string]*ChannelNodeInfo { return nil } +func (x *Replica) GetRwSqNodes() []int64 { + if x != nil { + return x.RwSqNodes + } + return nil +} + +func (x *Replica) GetRoSqNodes() []int64 { + if x != nil { + return x.RoSqNodes + } + return nil +} + type SyncAction struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -8023,7 +8043,7 @@ var file_query_coord_proto_rawDesc = []byte{ 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x2c, 0x0a, 0x0f, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x19, 0x0a, 0x08, 0x72, 0x77, 0x5f, 0x6e, 0x6f, 0x64, 0x65, 0x73, 0x18, 0x06, 0x20, 0x03, 0x28, 0x03, - 0x52, 0x07, 0x72, 0x77, 0x4e, 0x6f, 0x64, 0x65, 0x73, 0x22, 0xe0, 0x02, 0x0a, 0x07, 0x52, 0x65, + 0x52, 0x07, 0x72, 0x77, 0x4e, 0x6f, 0x64, 0x65, 0x73, 0x22, 0xa0, 0x03, 0x0a, 0x07, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x12, 0x0e, 0x0a, 0x02, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x02, 0x49, 0x44, 0x12, 0x22, 0x0a, 0x0c, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x44, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0c, 0x63, 0x6f, 0x6c, @@ -8039,7 +8059,11 @@ var file_query_coord_proto_rawDesc = []byte{ 0x72, 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x2e, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x10, 0x63, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x6e, 0x66, - 0x6f, 0x73, 0x1a, 0x68, 0x0a, 0x15, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x4e, 0x6f, 0x64, + 0x6f, 0x73, 0x12, 0x1e, 0x0a, 0x0b, 0x72, 0x77, 0x5f, 0x73, 0x71, 0x5f, 0x6e, 0x6f, 0x64, 0x65, + 0x73, 0x18, 0x07, 0x20, 0x03, 0x28, 0x03, 0x52, 0x09, 0x72, 0x77, 0x53, 0x71, 0x4e, 0x6f, 0x64, + 0x65, 0x73, 0x12, 0x1e, 0x0a, 0x0b, 0x72, 0x6f, 0x5f, 0x73, 0x71, 0x5f, 0x6e, 0x6f, 0x64, 0x65, + 0x73, 0x18, 0x08, 0x20, 0x03, 0x28, 0x03, 0x52, 0x09, 0x72, 0x6f, 0x53, 0x71, 0x4e, 0x6f, 0x64, + 0x65, 0x73, 0x1a, 0x68, 0x0a, 0x15, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x39, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x23, 0x2e, 0x6d, diff --git a/pkg/util/merr/utils.go b/pkg/util/merr/utils.go index f40681486f6e3..e164a6e7ab950 100644 --- a/pkg/util/merr/utils.go +++ b/pkg/util/merr/utils.go @@ -653,6 +653,15 @@ func WrapErrResourceGroupIllegalConfig(rg any, cfg any, msg ...string) error { return err } +// WrapErrStreamingNodeNotEnough make a streaming node is not enough error +func WrapErrStreamingNodeNotEnough(current int, expected int, msg ...string) error { + err := wrapFields(ErrServiceResourceInsufficient, value("currentStreamingNode", current), value("expectedStreamingNode", expected)) + if len(msg) > 0 { + err = errors.Wrap(err, strings.Join(msg, "->")) + } + return err +} + // go:deprecated // WrapErrResourceGroupNodeNotEnough wraps ErrResourceGroupNodeNotEnough with resource group func WrapErrResourceGroupNodeNotEnough(rg any, current any, expected any, msg ...string) error {