From fc664232ad8600a0205f5aa9e4c9ca6d99109476 Mon Sep 17 00:00:00 2001 From: Wei Liu Date: Wed, 5 Feb 2025 10:44:05 +0800 Subject: [PATCH] enhance: Remove QueryCoord's scheduling of L0 segments Signed-off-by: Wei Liu --- .../querycoordv2/checkers/index_checker.go | 7 - .../querycoordv2/checkers/leader_checker.go | 9 +- .../checkers/leader_checker_test.go | 47 ----- .../querycoordv2/checkers/segment_checker.go | 68 +------- .../checkers/segment_checker_test.go | 165 ------------------ internal/querycoordv2/dist/dist_handler.go | 12 +- internal/querycoordv2/meta/target_manager.go | 26 +-- .../querycoordv2/meta/target_manager_test.go | 4 +- .../querycoordv2/observers/target_observer.go | 1 + internal/querycoordv2/task/task_test.go | 102 ----------- internal/querycoordv2/utils/util.go | 9 +- internal/querynodev2/delegator/delegator.go | 8 +- .../querynodev2/delegator/delegator_data.go | 150 ++++++++++------ .../delegator/delegator_data_test.go | 49 +----- .../querynodev2/delegator/delegator_test.go | 2 +- .../delegator/deletebuffer/delete_buffer.go | 71 +++++++- .../deletebuffer/delete_buffer_test.go | 60 +++++++ .../deletebuffer/list_delete_buffer.go | 62 +++++++ .../deletebuffer/list_delete_buffer_test.go | 60 +++++++ .../querynodev2/delegator/delta_forward.go | 4 +- .../delegator/delta_forward_test.go | 4 +- .../querynodev2/delegator/mock_delegator.go | 65 ++++++- internal/querynodev2/handlers.go | 13 +- .../querynodev2/segments/segment_loader.go | 4 +- internal/querynodev2/services.go | 19 +- internal/querynodev2/services_test.go | 22 +-- pkg/proto/query_coord.proto | 1 + pkg/proto/querypb/query_coord.pb.go | 14 +- tests/integration/balance/balance_test.go | 20 +-- 29 files changed, 474 insertions(+), 604 deletions(-) diff --git a/internal/querycoordv2/checkers/index_checker.go b/internal/querycoordv2/checkers/index_checker.go index 99fc36abf395b..ea9a04ca41888 100644 --- a/internal/querycoordv2/checkers/index_checker.go +++ b/internal/querycoordv2/checkers/index_checker.go @@ -29,7 +29,6 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/task" "github.com/milvus-io/milvus/internal/querycoordv2/utils" "github.com/milvus-io/milvus/pkg/log" - "github.com/milvus-io/milvus/pkg/proto/datapb" "github.com/milvus-io/milvus/pkg/proto/indexpb" "github.com/milvus-io/milvus/pkg/proto/querypb" "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -120,12 +119,6 @@ func (c *IndexChecker) checkReplica(ctx context.Context, collection *meta.Collec continue } - // skip update index for l0 segment - segmentInTarget := c.targetMgr.GetSealedSegment(ctx, collection.GetCollectionID(), segment.GetID(), meta.CurrentTargetFirst) - if segmentInTarget == nil || segmentInTarget.GetLevel() == datapb.SegmentLevel_L0 { - continue - } - missing := c.checkSegment(segment, indexInfos) if len(missing) > 0 { targets[segment.GetID()] = missing diff --git a/internal/querycoordv2/checkers/leader_checker.go b/internal/querycoordv2/checkers/leader_checker.go index 110d58068f52d..552966e90f173 100644 --- a/internal/querycoordv2/checkers/leader_checker.go +++ b/internal/querycoordv2/checkers/leader_checker.go @@ -28,7 +28,6 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/utils" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" - "github.com/milvus-io/milvus/pkg/proto/datapb" ) var _ Checker = (*LeaderChecker)(nil) @@ -164,10 +163,7 @@ func (c *LeaderChecker) findNeedLoadedSegments(ctx context.Context, replica *met latestNodeDist := utils.FindMaxVersionSegments(dist) for _, s := range latestNodeDist { segment := c.target.GetSealedSegment(ctx, leaderView.CollectionID, s.GetID(), meta.CurrentTargetFirst) - existInTarget := segment != nil - isL0Segment := existInTarget && segment.GetLevel() == datapb.SegmentLevel_L0 - // shouldn't set l0 segment location to delegator. l0 segment should be reload in delegator - if !existInTarget || isL0Segment { + if segment == nil { continue } @@ -218,8 +214,7 @@ func (c *LeaderChecker) findNeedRemovedSegments(ctx context.Context, replica *me _, ok := distMap[sid] segment := c.target.GetSealedSegment(ctx, leaderView.CollectionID, sid, meta.CurrentTargetFirst) existInTarget := segment != nil - isL0Segment := existInTarget && segment.GetLevel() == datapb.SegmentLevel_L0 - if ok || existInTarget || isL0Segment { + if ok || existInTarget { continue } log.Debug("leader checker append a segment to remove", diff --git a/internal/querycoordv2/checkers/leader_checker_test.go b/internal/querycoordv2/checkers/leader_checker_test.go index 1d44938f7690b..f460344bb068f 100644 --- a/internal/querycoordv2/checkers/leader_checker_test.go +++ b/internal/querycoordv2/checkers/leader_checker_test.go @@ -162,29 +162,6 @@ func (suite *LeaderCheckerTestSuite) TestSyncLoadedSegments() { suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).GetLeaderID(), node2) suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).SegmentID(), int64(1)) suite.Equal(tasks[0].Priority(), task.TaskPriorityLow) - - // test skip sync l0 segment - segments = []*datapb.SegmentInfo{ - { - ID: 1, - PartitionID: 1, - InsertChannel: "test-insert-channel", - Level: datapb.SegmentLevel_L0, - }, - } - suite.broker.ExpectedCalls = nil - suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return( - channels, segments, nil) - observer.target.UpdateCollectionNextTarget(ctx, int64(1)) - observer.target.UpdateCollectionCurrentTarget(ctx, 1) - // mock l0 segment exist on non delegator node, doesn't set to leader view - observer.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 1, loadVersion, "test-insert-channel")) - observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel")) - view = utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{}) - view.TargetVersion = observer.target.GetCollectionTargetVersion(ctx, 1, meta.CurrentTarget) - observer.dist.LeaderViewManager.Update(2, view) - tasks = suite.checker.Check(context.TODO()) - suite.Len(tasks, 0) } func (suite *LeaderCheckerTestSuite) TestActivation() { @@ -423,30 +400,6 @@ func (suite *LeaderCheckerTestSuite) TestSyncRemovedSegments() { suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).SegmentID(), int64(3)) suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).Version(), int64(0)) suite.Equal(tasks[0].Priority(), task.TaskPriorityLow) - - // skip sync l0 segments - segments := []*datapb.SegmentInfo{ - { - ID: 3, - PartitionID: 1, - InsertChannel: "test-insert-channel", - Level: datapb.SegmentLevel_L0, - }, - } - suite.broker.ExpectedCalls = nil - suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return( - channels, segments, nil) - - observer.target.UpdateCollectionNextTarget(ctx, int64(1)) - observer.target.UpdateCollectionCurrentTarget(ctx, 1) - - observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel")) - view = utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{3: 1}, map[int64]*meta.Segment{}) - view.TargetVersion = observer.target.GetCollectionTargetVersion(ctx, 1, meta.CurrentTarget) - observer.dist.LeaderViewManager.Update(2, view) - - tasks = suite.checker.Check(context.TODO()) - suite.Len(tasks, 0) } func (suite *LeaderCheckerTestSuite) TestIgnoreSyncRemovedSegments() { diff --git a/internal/querycoordv2/checkers/segment_checker.go b/internal/querycoordv2/checkers/segment_checker.go index ca6d108c5c953..1b55065bf045c 100644 --- a/internal/querycoordv2/checkers/segment_checker.go +++ b/internal/querycoordv2/checkers/segment_checker.go @@ -21,7 +21,6 @@ import ( "sort" "time" - "github.com/blang/semver/v4" "github.com/samber/lo" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" @@ -245,37 +244,8 @@ func (c *SegmentChecker) getSealedSegmentDiff( distMap[s.GetID()] = s.Node } - versionRangeFilter := semver.MustParseRange(">2.3.x") - checkLeaderVersion := func(leader *meta.LeaderView, segmentID int64) bool { - // if current shard leader's node version < 2.4, skip load L0 segment - info := c.nodeMgr.Get(leader.ID) - if info != nil && !versionRangeFilter(info.Version()) { - log.Warn("l0 segment is not supported in current node version, skip it", - zap.Int64("collection", replica.GetCollectionID()), - zap.Int64("segmentID", segmentID), - zap.String("channel", leader.Channel), - zap.Int64("leaderID", leader.ID), - zap.String("nodeVersion", info.Version().String())) - return false - } - return true - } - isSegmentLack := func(segment *datapb.SegmentInfo) bool { - node, existInDist := distMap[segment.ID] - - if segment.GetLevel() == datapb.SegmentLevel_L0 { - // the L0 segments have to been in the same node as the channel watched - leader := c.dist.LeaderViewManager.GetLatestShardLeaderByFilter(meta.WithReplica2LeaderView(replica), meta.WithChannelName2LeaderView(segment.GetInsertChannel())) - - // if the leader node's version doesn't match load l0 segment's requirement, skip it - if leader != nil && checkLeaderVersion(leader, segment.ID) { - l0WithWrongLocation := node != leader.ID - return !existInDist || l0WithWrongLocation - } - return false - } - + _, existInDist := distMap[segment.ID] return !existInDist } @@ -290,18 +260,6 @@ func (c *SegmentChecker) getSealedSegmentDiff( } } - // l0 Segment which exist on current target, but not on dist - for _, segment := range currentTargetMap { - // to avoid generate duplicate segment task - if nextTargetMap[segment.ID] != nil { - continue - } - - if isSegmentLack(segment) { - toLoad = append(toLoad, segment) - } - } - // get segment which exist on dist, but not on current target and next target for _, segment := range dist { _, existOnCurrent := currentTargetMap[segment.GetID()] @@ -313,16 +271,6 @@ func (c *SegmentChecker) getSealedSegmentDiff( } } - level0Segments := lo.Filter(toLoad, func(segment *datapb.SegmentInfo, _ int) bool { - return segment.GetLevel() == datapb.SegmentLevel_L0 - }) - // L0 segment found, - // QueryCoord loads the L0 segments first, - // to make sure all L0 delta logs will be delivered to the other segments. - if len(level0Segments) > 0 { - toLoad = level0Segments - } - return } @@ -336,14 +284,6 @@ func (c *SegmentChecker) findRepeatedSealedSegments(ctx context.Context, replica dist := c.dist.SegmentDistManager.GetByFilter(meta.WithCollectionID(replica.GetCollectionID()), meta.WithReplica(replica)) versions := make(map[int64]*meta.Segment) for _, s := range dist { - // l0 segment should be release with channel together - segment := c.targetMgr.GetSealedSegment(ctx, s.GetCollectionID(), s.GetID(), meta.CurrentTargetFirst) - existInTarget := segment != nil - isL0Segment := existInTarget && segment.GetLevel() == datapb.SegmentLevel_L0 - if isL0Segment { - continue - } - maxVer, ok := versions[s.GetID()] if !ok { versions[s.GetID()] = s @@ -408,7 +348,6 @@ func (c *SegmentChecker) createSegmentLoadTasks(ctx context.Context, segments [] return nil } - isLevel0 := segments[0].GetLevel() == datapb.SegmentLevel_L0 shardSegments := lo.GroupBy(segments, func(s *datapb.SegmentInfo) string { return s.GetInsertChannel() }) @@ -426,11 +365,6 @@ func (c *SegmentChecker) createSegmentLoadTasks(ctx context.Context, segments [] rwNodes = replica.GetRWNodes() } - // L0 segment can only be assign to shard leader's node - if isLevel0 { - rwNodes = []int64{leader.ID} - } - segmentInfos := lo.Map(segments, func(s *datapb.SegmentInfo, _ int) *meta.Segment { return &meta.Segment{ SegmentInfo: s, diff --git a/internal/querycoordv2/checkers/segment_checker_test.go b/internal/querycoordv2/checkers/segment_checker_test.go index 30000c18016a1..19208ed0dcc0b 100644 --- a/internal/querycoordv2/checkers/segment_checker_test.go +++ b/internal/querycoordv2/checkers/segment_checker_test.go @@ -33,7 +33,6 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/task" "github.com/milvus-io/milvus/internal/querycoordv2/utils" - "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/kv" "github.com/milvus-io/milvus/pkg/proto/datapb" "github.com/milvus-io/milvus/pkg/util/etcd" @@ -170,170 +169,6 @@ func (suite *SegmentCheckerTestSuite) TestLoadSegments() { suite.Len(tasks, 1) } -func (suite *SegmentCheckerTestSuite) TestLoadL0Segments() { - ctx := context.Background() - checker := suite.checker - // set meta - checker.meta.CollectionManager.PutCollection(ctx, utils.CreateTestCollection(1, 1)) - checker.meta.CollectionManager.PutPartition(ctx, utils.CreateTestPartition(1, 1)) - checker.meta.ReplicaManager.Put(ctx, utils.CreateTestReplica(1, 1, []int64{1, 2})) - suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ - NodeID: 1, - Address: "localhost", - Hostname: "localhost", - Version: common.Version, - })) - suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ - NodeID: 2, - Address: "localhost", - Hostname: "localhost", - Version: common.Version, - })) - checker.meta.ResourceManager.HandleNodeUp(ctx, 1) - checker.meta.ResourceManager.HandleNodeUp(ctx, 2) - - // set target - segments := []*datapb.SegmentInfo{ - { - ID: 1, - PartitionID: 1, - InsertChannel: "test-insert-channel", - Level: datapb.SegmentLevel_L0, - }, - } - - channels := []*datapb.VchannelInfo{ - { - CollectionID: 1, - ChannelName: "test-insert-channel", - }, - } - - suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return( - channels, segments, nil) - checker.targetMgr.UpdateCollectionNextTarget(ctx, int64(1)) - - // set dist - checker.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel")) - checker.dist.LeaderViewManager.Update(2, utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{})) - - // test load l0 segments in next target - tasks := checker.Check(context.TODO()) - suite.Len(tasks, 1) - suite.Len(tasks[0].Actions(), 1) - action, ok := tasks[0].Actions()[0].(*task.SegmentAction) - suite.True(ok) - suite.EqualValues(1, tasks[0].ReplicaID()) - suite.Equal(task.ActionTypeGrow, action.Type()) - suite.EqualValues(1, action.GetSegmentID()) - suite.EqualValues(2, action.Node()) - suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal) - - checker.targetMgr.UpdateCollectionCurrentTarget(ctx, int64(1)) - // test load l0 segments in current target - tasks = checker.Check(context.TODO()) - suite.Len(tasks, 1) - suite.Len(tasks[0].Actions(), 1) - action, ok = tasks[0].Actions()[0].(*task.SegmentAction) - suite.True(ok) - suite.EqualValues(1, tasks[0].ReplicaID()) - suite.Equal(task.ActionTypeGrow, action.Type()) - suite.EqualValues(1, action.GetSegmentID()) - suite.EqualValues(2, action.Node()) - suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal) - - // seg l0 segment exist on a non delegator node - checker.targetMgr.UpdateCollectionNextTarget(ctx, int64(1)) - checker.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 1, 1, "test-insert-channel")) - // test load l0 segments to delegator - tasks = checker.Check(context.TODO()) - suite.Len(tasks, 1) - suite.Len(tasks[0].Actions(), 1) - action, ok = tasks[0].Actions()[0].(*task.SegmentAction) - suite.True(ok) - suite.EqualValues(1, tasks[0].ReplicaID()) - suite.Equal(task.ActionTypeGrow, action.Type()) - suite.EqualValues(1, action.GetSegmentID()) - suite.EqualValues(2, action.Node()) - suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal) -} - -func (suite *SegmentCheckerTestSuite) TestReleaseL0Segments() { - ctx := context.Background() - checker := suite.checker - // set meta - checker.meta.CollectionManager.PutCollection(ctx, utils.CreateTestCollection(1, 1)) - checker.meta.CollectionManager.PutPartition(ctx, utils.CreateTestPartition(1, 1)) - checker.meta.ReplicaManager.Put(ctx, utils.CreateTestReplica(1, 1, []int64{1, 2})) - suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ - NodeID: 1, - Address: "localhost", - Hostname: "localhost", - })) - suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ - NodeID: 2, - Address: "localhost", - Hostname: "localhost", - })) - checker.meta.ResourceManager.HandleNodeUp(ctx, 1) - checker.meta.ResourceManager.HandleNodeUp(ctx, 2) - - // set target - segments := []*datapb.SegmentInfo{ - { - ID: 1, - PartitionID: 1, - InsertChannel: "test-insert-channel", - Level: datapb.SegmentLevel_L0, - }, - } - - channels := []*datapb.VchannelInfo{ - { - CollectionID: 1, - ChannelName: "test-insert-channel", - }, - } - - suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return( - channels, segments, nil) - checker.targetMgr.UpdateCollectionNextTarget(ctx, int64(1)) - checker.targetMgr.UpdateCollectionCurrentTarget(ctx, int64(1)) - - // set dist - checker.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel")) - checker.dist.LeaderViewManager.Update(2, utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{})) - - // seg l0 segment exist on a non delegator node - checker.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 1, 1, "test-insert-channel")) - checker.dist.SegmentDistManager.Update(2, utils.CreateTestSegment(1, 1, 1, 2, 100, "test-insert-channel")) - - // release duplicate l0 segment - tasks := checker.Check(context.TODO()) - suite.Len(tasks, 0) - - checker.dist.SegmentDistManager.Update(1) - - // test release l0 segment which doesn't exist in target - suite.broker.ExpectedCalls = nil - suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return( - channels, nil, nil) - checker.targetMgr.UpdateCollectionNextTarget(ctx, int64(1)) - checker.targetMgr.UpdateCollectionCurrentTarget(ctx, int64(1)) - checker.targetMgr.UpdateCollectionNextTarget(ctx, int64(1)) - - tasks = checker.Check(context.TODO()) - suite.Len(tasks, 1) - suite.Len(tasks[0].Actions(), 1) - action, ok := tasks[0].Actions()[0].(*task.SegmentAction) - suite.True(ok) - suite.EqualValues(1, tasks[0].ReplicaID()) - suite.Equal(task.ActionTypeReduce, action.Type()) - suite.EqualValues(1, action.GetSegmentID()) - suite.EqualValues(2, action.Node()) - suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal) -} - func (suite *SegmentCheckerTestSuite) TestSkipLoadSegments() { ctx := context.Background() checker := suite.checker diff --git a/internal/querycoordv2/dist/dist_handler.go b/internal/querycoordv2/dist/dist_handler.go index a42397057b034..400dbe7185db2 100644 --- a/internal/querycoordv2/dist/dist_handler.go +++ b/internal/querycoordv2/dist/dist_handler.go @@ -151,6 +151,11 @@ func (dh *distHandler) handleDistResp(ctx context.Context, resp *querypb.GetData func (dh *distHandler) updateSegmentsDistribution(ctx context.Context, resp *querypb.GetDataDistributionResponse) { updates := make([]*meta.Segment, 0, len(resp.GetSegments())) for _, s := range resp.GetSegments() { + // To maintain compatibility with older versions of QueryNode, + // QueryCoord should neither process nor interact with L0 segments. + if s.GetLevel() == datapb.SegmentLevel_L0 { + continue + } segmentInfo := dh.target.GetSealedSegment(ctx, s.GetCollection(), s.GetID(), meta.CurrentTargetFirst) if segmentInfo == nil { segmentInfo = &datapb.SegmentInfo{ @@ -211,8 +216,13 @@ func (dh *distHandler) updateLeaderView(ctx context.Context, resp *querypb.GetDa collectionsToSync := typeutil.NewUniqueSet() for _, lview := range resp.GetLeaderViews() { segments := make(map[int64]*meta.Segment) - for ID, position := range lview.GrowingSegments { + // To maintain compatibility with older versions of QueryNode, + // QueryCoord should neither process nor interact with L0 segments. + segmentInfo := dh.target.GetSealedSegment(ctx, lview.GetCollection(), ID, meta.CurrentTargetFirst) + if segmentInfo != nil && segmentInfo.GetLevel() == datapb.SegmentLevel_L0 { + continue + } segments[ID] = &meta.Segment{ SegmentInfo: &datapb.SegmentInfo{ ID: ID, diff --git a/internal/querycoordv2/meta/target_manager.go b/internal/querycoordv2/meta/target_manager.go index 8213ee869c312..215446f6c7537 100644 --- a/internal/querycoordv2/meta/target_manager.go +++ b/internal/querycoordv2/meta/target_manager.go @@ -25,7 +25,6 @@ import ( "github.com/samber/lo" "go.uber.org/zap" - "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/internal/json" "github.com/milvus-io/milvus/internal/metastore" "github.com/milvus-io/milvus/pkg/common" @@ -159,22 +158,7 @@ func (mgr *TargetManager) UpdateCollectionNextTarget(ctx context.Context, collec return partition.PartitionID }) - channelInfos := make(map[string][]*datapb.VchannelInfo) segments := make(map[int64]*datapb.SegmentInfo, 0) - dmChannels := make(map[string]*DmChannel) - - for _, info := range vChannelInfos { - channelInfos[info.GetChannelName()] = append(channelInfos[info.GetChannelName()], info) - for _, segmentID := range info.GetLevelZeroSegmentIds() { - segments[segmentID] = &datapb.SegmentInfo{ - ID: segmentID, - CollectionID: collectionID, - InsertChannel: info.GetChannelName(), - State: commonpb.SegmentState_Flushed, - Level: datapb.SegmentLevel_L0, - } - } - } partitionSet := typeutil.NewUniqueSet(partitionIDs...) for _, segmentInfo := range segmentInfos { if partitionSet.Contain(segmentInfo.GetPartitionID()) || segmentInfo.GetPartitionID() == common.AllPartitionsID { @@ -182,9 +166,9 @@ func (mgr *TargetManager) UpdateCollectionNextTarget(ctx context.Context, collec } } - for _, infos := range channelInfos { - merged := mergeDmChannelInfo(infos) - dmChannels[merged.GetChannelName()] = merged + dmChannels := make(map[string]*DmChannel) + for _, channelInfo := range vChannelInfos { + dmChannels[channelInfo.ChannelName] = DmChannelFromVChannel(channelInfo) } if len(segments) == 0 && len(dmChannels) == 0 { @@ -574,12 +558,12 @@ func (mgr *TargetManager) Recover(ctx context.Context, catalog metastore.QueryCo // if segment isn't l0 segment, and exist in current/next target, then it can be moved func (mgr *TargetManager) CanSegmentBeMoved(ctx context.Context, collectionID, segmentID int64) bool { current := mgr.current.getCollectionTarget(collectionID) - if current != nil && current.segments[segmentID] != nil && current.segments[segmentID].GetLevel() != datapb.SegmentLevel_L0 { + if current != nil && current.segments[segmentID] != nil { return true } next := mgr.next.getCollectionTarget(collectionID) - if next != nil && next.segments[segmentID] != nil && next.segments[segmentID].GetLevel() != datapb.SegmentLevel_L0 { + if next != nil && next.segments[segmentID] != nil { return true } diff --git a/internal/querycoordv2/meta/target_manager_test.go b/internal/querycoordv2/meta/target_manager_test.go index 7fec53bf9de35..109f712ee0e16 100644 --- a/internal/querycoordv2/meta/target_manager_test.go +++ b/internal/querycoordv2/meta/target_manager_test.go @@ -257,7 +257,7 @@ func (suite *TargetManagerSuite) TestRemovePartition() { suite.assertChannels([]string{}, suite.mgr.GetDmChannelsByCollection(ctx, collectionID, CurrentTarget)) suite.mgr.RemovePartition(ctx, collectionID, 100) - suite.assertSegments(append([]int64{3, 4}, suite.level0Segments...), suite.mgr.GetSealedSegmentsByCollection(ctx, collectionID, NextTarget)) + suite.assertSegments([]int64{3, 4}, suite.mgr.GetSealedSegmentsByCollection(ctx, collectionID, NextTarget)) suite.assertChannels(suite.channels[collectionID], suite.mgr.GetDmChannelsByCollection(ctx, collectionID, NextTarget)) suite.assertSegments([]int64{}, suite.mgr.GetSealedSegmentsByCollection(ctx, collectionID, CurrentTarget)) suite.assertChannels([]string{}, suite.mgr.GetDmChannelsByCollection(ctx, collectionID, CurrentTarget)) @@ -303,7 +303,7 @@ func (suite *TargetManagerSuite) getAllSegment(collectionID int64, partitionIDs } } - return append(allSegments, suite.level0Segments...) + return allSegments } func (suite *TargetManagerSuite) assertChannels(expected []string, actual map[string]*DmChannel) bool { diff --git a/internal/querycoordv2/observers/target_observer.go b/internal/querycoordv2/observers/target_observer.go index c4938c453a85f..2f27b32f51354 100644 --- a/internal/querycoordv2/observers/target_observer.go +++ b/internal/querycoordv2/observers/target_observer.go @@ -511,6 +511,7 @@ func (ob *TargetObserver) checkNeedUpdateTargetVersion(ctx context.Context, lead if channel != nil { action.Checkpoint = channel.GetSeekPosition() + action.L0InTarget = channel.GetLevelZeroSegmentIds() } return action diff --git a/internal/querycoordv2/task/task_test.go b/internal/querycoordv2/task/task_test.go index be5fc021bc768..aa3e635f5751f 100644 --- a/internal/querycoordv2/task/task_test.go +++ b/internal/querycoordv2/task/task_test.go @@ -1676,108 +1676,6 @@ func (suite *TaskSuite) TestBalanceChannelTask() { suite.Equal(2, task.step) } -func (suite *TaskSuite) TestBalanceChannelWithL0SegmentTask() { - ctx := context.Background() - collectionID := int64(1) - partitionID := int64(1) - channel := "channel-1" - vchannel := &datapb.VchannelInfo{ - CollectionID: collectionID, - ChannelName: channel, - } - - segments := []*datapb.SegmentInfo{ - { - ID: 1, - CollectionID: collectionID, - PartitionID: partitionID, - InsertChannel: channel, - Level: datapb.SegmentLevel_L0, - }, - { - ID: 2, - CollectionID: collectionID, - PartitionID: partitionID, - InsertChannel: channel, - Level: datapb.SegmentLevel_L0, - }, - { - ID: 3, - CollectionID: collectionID, - PartitionID: partitionID, - InsertChannel: channel, - Level: datapb.SegmentLevel_L0, - }, - } - suite.meta.PutCollection(ctx, utils.CreateTestCollection(collectionID, 1), utils.CreateTestPartition(collectionID, 1)) - suite.broker.ExpectedCalls = nil - suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, mock.Anything).Return([]*datapb.VchannelInfo{vchannel}, segments, nil) - suite.target.UpdateCollectionNextTarget(ctx, collectionID) - suite.target.UpdateCollectionCurrentTarget(ctx, collectionID) - suite.target.UpdateCollectionNextTarget(ctx, collectionID) - - suite.dist.LeaderViewManager.Update(2, &meta.LeaderView{ - ID: 2, - CollectionID: collectionID, - Channel: channel, - Segments: map[int64]*querypb.SegmentDist{ - 1: {NodeID: 2}, - 2: {NodeID: 2}, - 3: {NodeID: 2}, - }, - }) - suite.dist.LeaderViewManager.Update(1, &meta.LeaderView{ - ID: 1, - CollectionID: collectionID, - Channel: channel, - Segments: map[int64]*querypb.SegmentDist{ - 1: {NodeID: 2}, - 2: {NodeID: 2}, - 3: {NodeID: 2}, - }, - UnServiceableError: merr.ErrSegmentLack, - }) - - task, err := NewChannelTask(context.Background(), - 10*time.Second, - WrapIDSource(2), - collectionID, - meta.NewReplica( - &querypb.Replica{ - ID: 1, - }, - typeutil.NewUniqueSet(), - ), - NewChannelAction(1, ActionTypeGrow, channel), - NewChannelAction(2, ActionTypeReduce, channel), - ) - suite.NoError(err) - - // l0 hasn't been loaded into delegator, block balance - suite.scheduler.preProcess(task) - suite.Equal(0, task.step) - - suite.dist.LeaderViewManager.Update(1, &meta.LeaderView{ - ID: 1, - CollectionID: collectionID, - Channel: channel, - Segments: map[int64]*querypb.SegmentDist{ - 1: {NodeID: 1}, - 2: {NodeID: 1}, - 3: {NodeID: 1}, - }, - }) - - // new delegator distribution updated, task step up - suite.scheduler.preProcess(task) - suite.Equal(1, task.step) - - suite.dist.LeaderViewManager.Update(2) - // old delegator removed - suite.scheduler.preProcess(task) - suite.Equal(2, task.step) -} - func (suite *TaskSuite) TestGetTasksJSON() { ctx := context.Background() scheduler := suite.newScheduler() diff --git a/internal/querycoordv2/utils/util.go b/internal/querycoordv2/utils/util.go index 8d13056c41227..1ecf9eaf1f077 100644 --- a/internal/querycoordv2/utils/util.go +++ b/internal/querycoordv2/utils/util.go @@ -27,7 +27,6 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/meta" "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/pkg/log" - "github.com/milvus-io/milvus/pkg/proto/datapb" "github.com/milvus-io/milvus/pkg/proto/querypb" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -70,18 +69,12 @@ func CheckDelegatorDataReady(nodeMgr *session.NodeManager, targetMgr meta.Target } segmentDist := targetMgr.GetSealedSegmentsByChannel(context.TODO(), leader.CollectionID, leader.Channel, scope) // Check whether segments are fully loaded - for segmentID, info := range segmentDist { + for segmentID := range segmentDist { _, exist := leader.Segments[segmentID] if !exist { log.RatedInfo(10, "leader is not available due to lack of segment", zap.Int64("segmentID", segmentID)) return merr.WrapErrSegmentLack(segmentID) } - - l0WithWrongLocation := info.GetLevel() == datapb.SegmentLevel_L0 && leader.Segments[segmentID].GetNodeID() != leader.ID - if l0WithWrongLocation { - log.RatedInfo(10, "leader is not available due to lack of L0 segment", zap.Int64("segmentID", segmentID)) - return merr.WrapErrSegmentLack(segmentID) - } } return nil } diff --git a/internal/querynodev2/delegator/delegator.go b/internal/querynodev2/delegator/delegator.go index 6f7f157fb8b92..65026fd60e5e3 100644 --- a/internal/querynodev2/delegator/delegator.go +++ b/internal/querynodev2/delegator/delegator.go @@ -79,9 +79,10 @@ type ShardDelegator interface { ProcessInsert(insertRecords map[int64]*InsertData) ProcessDelete(deleteData []*DeleteData, ts uint64) LoadGrowing(ctx context.Context, infos []*querypb.SegmentLoadInfo, version int64) error + LoadL0(ctx context.Context, infos []*querypb.SegmentLoadInfo, version int64) error LoadSegments(ctx context.Context, req *querypb.LoadSegmentsRequest) error ReleaseSegments(ctx context.Context, req *querypb.ReleaseSegmentsRequest, force bool) error - SyncTargetVersion(newVersion int64, partitions []int64, growingInTarget []int64, sealedInTarget []int64, droppedInTarget []int64, checkpoint *msgpb.MsgPosition) + SyncTargetVersion(newVersion int64, partitions []int64, growingInTarget []int64, sealedInTarget []int64, droppedInTarget []int64, checkpoint *msgpb.MsgPosition, l0InTarget []int64) GetTargetVersion() int64 GetDeleteBufferSize() (entryNum int64, memorySize int64) @@ -814,6 +815,11 @@ func (sd *shardDelegator) Close() { sd.tsCond.Broadcast() sd.lifetime.Wait() + // clean up l0 segment in delete buffer + start := time.Now() + sd.deleteBuffer.Clear() + log.Info("unregister all l0 segments", zap.Duration("cost", time.Since(start))) + metrics.QueryNodeDeleteBufferSize.DeleteLabelValues(fmt.Sprint(paramtable.GetNodeID()), sd.vchannelName) metrics.QueryNodeDeleteBufferRowNum.DeleteLabelValues(fmt.Sprint(paramtable.GetNodeID()), sd.vchannelName) } diff --git a/internal/querynodev2/delegator/delegator_data.go b/internal/querynodev2/delegator/delegator_data.go index c0ce3f516d1f8..678860dc0d0c9 100644 --- a/internal/querynodev2/delegator/delegator_data.go +++ b/internal/querynodev2/delegator/delegator_data.go @@ -410,6 +410,10 @@ func (sd *shardDelegator) LoadSegments(ctx context.Context, req *querypb.LoadSeg zap.Int64s("segments", lo.Map(req.GetInfos(), func(info *querypb.SegmentLoadInfo, _ int) int64 { return info.GetSegmentID() })), ) + if req.GetInfos()[0].GetLevel() == datapb.SegmentLevel_L0 { + return merr.WrapErrServiceInternal("load L0 segment is not supported, l0 segment should only be loaded by watchChannel") + } + worker, err := sd.workerManager.GetWorker(ctx, targetNodeID) if err != nil { log.Warn("delegator failed to find worker", zap.Error(err)) @@ -420,17 +424,6 @@ func (sd *shardDelegator) LoadSegments(ctx context.Context, req *querypb.LoadSeg log.Debug("worker loads segments...") sLoad := func(ctx context.Context, req *querypb.LoadSegmentsRequest) error { - info := req.GetInfos()[0] - // put meta l0, instead of load actual delta data - if info.GetLevel() == datapb.SegmentLevel_L0 && sd.l0ForwardPolicy == L0ForwardPolicyRemoteLoad { - l0Seg, err := segments.NewL0Segment(sd.collection, segments.SegmentTypeSealed, req.GetVersion(), info) - if err != nil { - return err - } - sd.collection.Ref(1) - sd.segmentManager.Put(ctx, segments.SegmentTypeSealed, l0Seg) - return nil - } segmentID := req.GetInfos()[0].GetSegmentID() nodeID := req.GetDstNodeID() _, err, _ := sd.sf.Do(fmt.Sprintf("%d-%d", nodeID, segmentID), func() (struct{}, error) { @@ -481,51 +474,76 @@ func (sd *shardDelegator) LoadSegments(ctx context.Context, req *querypb.LoadSeg Level: info.GetLevel(), } }) - if req.GetInfos()[0].GetLevel() == datapb.SegmentLevel_L0 { - sd.RefreshLevel0DeletionStats() - } else { - // load bloom filter only when candidate not exists - infos := lo.Filter(req.GetInfos(), func(info *querypb.SegmentLoadInfo, _ int) bool { - return !sd.pkOracle.Exists(pkoracle.NewCandidateKey(info.GetSegmentID(), info.GetPartitionID(), commonpb.SegmentState_Sealed), targetNodeID) - }) - - var bm25Stats *typeutil.ConcurrentMap[int64, map[int64]*storage.BM25Stats] - if sd.idfOracle != nil { - bm25Stats, err = sd.loader.LoadBM25Stats(ctx, req.GetCollectionID(), infos...) - if err != nil { - log.Warn("failed to load bm25 stats for segment", zap.Error(err)) - return err - } - } + // load bloom filter only when candidate not exists + infos := lo.Filter(req.GetInfos(), func(info *querypb.SegmentLoadInfo, _ int) bool { + return !sd.pkOracle.Exists(pkoracle.NewCandidateKey(info.GetSegmentID(), info.GetPartitionID(), commonpb.SegmentState_Sealed), targetNodeID) + }) - candidates, err := sd.loader.LoadBloomFilterSet(ctx, req.GetCollectionID(), req.GetVersion(), infos...) + var bm25Stats *typeutil.ConcurrentMap[int64, map[int64]*storage.BM25Stats] + if sd.idfOracle != nil { + bm25Stats, err = sd.loader.LoadBM25Stats(ctx, req.GetCollectionID(), infos...) if err != nil { - log.Warn("failed to load bloom filter set for segment", zap.Error(err)) + log.Warn("failed to load bm25 stats for segment", zap.Error(err)) return err } + } - log.Debug("load delete...") - err = sd.loadStreamDelete(ctx, candidates, bm25Stats, infos, req, targetNodeID, worker) - if err != nil { - log.Warn("load stream delete failed", zap.Error(err)) - return err - } + candidates, err := sd.loader.LoadBloomFilterSet(ctx, req.GetCollectionID(), req.GetVersion(), infos...) + if err != nil { + log.Warn("failed to load bloom filter set for segment", zap.Error(err)) + return err + } + + log.Debug("load delete...") + err = sd.loadStreamDelete(ctx, candidates, bm25Stats, infos, req, targetNodeID, worker) + if err != nil { + log.Warn("load stream delete failed", zap.Error(err)) + return err } // alter distribution sd.distribution.AddDistributions(entries...) - partStatsToReload := make([]UniqueID, 0) - lo.ForEach(req.GetInfos(), func(info *querypb.SegmentLoadInfo, _ int) { - partStatsToReload = append(partStatsToReload, info.PartitionID) - }) + return nil +} +// LoadGrowing load growing segments locally. +func (sd *shardDelegator) LoadL0(ctx context.Context, infos []*querypb.SegmentLoadInfo, version int64) error { + log := sd.getLogger(ctx) + + segmentIDs := lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) int64 { return info.GetSegmentID() }) + log.Info("loading l0 segments...", zap.Int64s("segmentIDs", segmentIDs)) + + loaded := make([]segments.Segment, 0) + if sd.l0ForwardPolicy == L0ForwardPolicyRemoteLoad { + for _, info := range infos { + l0Seg, err := segments.NewL0Segment(sd.collection, segments.SegmentTypeSealed, version, info) + if err != nil { + return err + } + loaded = append(loaded, l0Seg) + } + } else { + var err error + loaded, err = sd.loader.Load(ctx, sd.collectionID, segments.SegmentTypeSealed, version, infos...) + if err != nil { + log.Warn("failed to load l0 segment", zap.Error(err)) + return err + } + } + + segmentIDs = lo.Map(loaded, func(segment segments.Segment, _ int) int64 { return segment.ID() }) + log.Info("load growing segments done", zap.Int64s("segmentIDs", segmentIDs)) + + sd.deleteBuffer.RegisterL0(loaded...) + // register l0 segment + sd.RefreshLevel0DeletionStats() return nil } func (sd *shardDelegator) GetLevel0Deletions(partitionID int64, candidate pkoracle.Candidate) (storage.PrimaryKeys, []storage.Timestamp) { // TODO: this could be large, host all L0 delete on delegator might be a dangerous, consider mmap it on local segment and stream processing it - level0Segments := sd.segmentManager.GetBy(segments.WithLevel(datapb.SegmentLevel_L0), segments.WithChannel(sd.vchannelName)) + level0Segments := sd.deleteBuffer.ListL0() deltaData := storage.NewDeltaData(0) for _, segment := range level0Segments { @@ -554,7 +572,7 @@ func (sd *shardDelegator) GetLevel0Deletions(partitionID int64, candidate pkorac } func (sd *shardDelegator) RefreshLevel0DeletionStats() { - level0Segments := sd.segmentManager.GetBy(segments.WithLevel(datapb.SegmentLevel_L0), segments.WithChannel(sd.vchannelName)) + level0Segments := sd.deleteBuffer.ListL0() totalSize := int64(0) for _, segment := range level0Segments { segment := segment.(*segments.L0Segment) @@ -562,6 +580,13 @@ func (sd *shardDelegator) RefreshLevel0DeletionStats() { totalSize += lo.SumBy(pks, func(pk storage.PrimaryKey) int64 { return pk.Size() }) + int64(len(tss)*8) } + metrics.QueryNodeNumSegments.WithLabelValues( + fmt.Sprint(paramtable.GetNodeID()), + fmt.Sprint(sd.Collection()), + commonpb.SegmentState_Sealed.String(), + datapb.SegmentLevel_L0.String(), + ).Set(float64(len(level0Segments))) + metrics.QueryNodeLevelZeroSize.WithLabelValues( fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(sd.collectionID), @@ -836,14 +861,14 @@ func (sd *shardDelegator) ReleaseSegments(ctx context.Context, req *querypb.Rele log := sd.getLogger(ctx) targetNodeID := req.GetNodeID() - level0Segments := typeutil.NewSet(lo.Map(sd.segmentManager.GetBy(segments.WithLevel(datapb.SegmentLevel_L0), segments.WithChannel(sd.vchannelName)), func(segment segments.Segment, _ int) int64 { + level0Segments := typeutil.NewSet(lo.Map(sd.deleteBuffer.ListL0(), func(segment segments.Segment, _ int) int64 { return segment.ID() })...) hasLevel0 := false for _, segmentID := range req.GetSegmentIDs() { hasLevel0 = level0Segments.Contain(segmentID) if hasLevel0 { - break + return merr.WrapErrServiceInternal("release L0 segment is not supported, l0 segment should only be released by unSubChannel/SyncDataDistribution") } } @@ -930,22 +955,17 @@ func (sd *shardDelegator) ReleaseSegments(ctx context.Context, req *querypb.Rele if releaseErr != nil { return releaseErr } - - if hasLevel0 { - sd.RefreshLevel0DeletionStats() - } - partitionsToReload := make([]UniqueID, 0) - lo.ForEach(req.GetSegmentIDs(), func(segmentID int64, _ int) { - segment := sd.segmentManager.Get(segmentID) - if segment != nil { - partitionsToReload = append(partitionsToReload, segment.Partition()) - } - }) return nil } -func (sd *shardDelegator) SyncTargetVersion(newVersion int64, partitions []int64, growingInTarget []int64, - sealedInTarget []int64, droppedInTarget []int64, checkpoint *msgpb.MsgPosition, +func (sd *shardDelegator) SyncTargetVersion( + newVersion int64, + partitions []int64, + growingInTarget []int64, + sealedInTarget []int64, + droppedInTarget []int64, + checkpoint *msgpb.MsgPosition, + l0InTarget []int64, ) { growings := sd.segmentManager.GetBy( segments.WithType(segments.SegmentTypeGrowing), @@ -976,8 +996,24 @@ func (sd *shardDelegator) SyncTargetVersion(newVersion int64, partitions []int64 log.Warn("found redundant growing segments", zap.Int64s("growingSegments", redundantGrowingIDs)) } + redundantL0IDs := make([]int64, 0) + l0Set := typeutil.NewUniqueSet(l0InTarget...) + for _, l0 := range sd.deleteBuffer.ListL0() { + if !l0Set.Contain(l0.ID()) { + redundantL0IDs = append(redundantL0IDs, l0.ID()) + } + } + sd.distribution.SyncTargetVersion(newVersion, partitions, growingInTarget, sealedInTarget, redundantGrowingIDs) - sd.deleteBuffer.TryDiscard(checkpoint.GetTimestamp()) + if len(redundantL0IDs) > 0 { + start := time.Now() + sd.deleteBuffer.UnRegister(checkpoint.GetTimestamp(), redundantL0IDs...) + log.Info("unregister l0 segments", + zap.Int64s("segment ids", redundantL0IDs), + zap.Duration("cost", time.Since(start)), + ) + sd.RefreshLevel0DeletionStats() + } } func (sd *shardDelegator) GetTargetVersion() int64 { diff --git a/internal/querynodev2/delegator/delegator_data_test.go b/internal/querynodev2/delegator/delegator_data_test.go index 695ac2bcc58b3..aede41ffe26e1 100644 --- a/internal/querynodev2/delegator/delegator_data_test.go +++ b/internal/querynodev2/delegator/delegator_data_test.go @@ -771,44 +771,7 @@ func (s *DelegatorDataSuite) TestLoadSegments() { }, }, }) - s.NoError(err) - - // err = s.delegator.LoadSegments(ctx, &querypb.LoadSegmentsRequest{ - // Base: commonpbutil.NewMsgBase(), - // DstNodeID: 1, - // CollectionID: s.collectionID, - // Infos: []*querypb.SegmentLoadInfo{ - // { - // SegmentID: 200, - // PartitionID: 500, - // StartPosition: &msgpb.MsgPosition{Timestamp: 20000}, - // DeltaPosition: &msgpb.MsgPosition{Timestamp: 20000}, - // Level: datapb.SegmentLevel_L1, - // InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", s.collectionID), - // }, - // }, - // }) - - s.NoError(err) - sealed, _ := s.delegator.GetSegmentInfo(false) - s.Require().Equal(1, len(sealed)) - s.Equal(int64(1), sealed[0].NodeID) - s.ElementsMatch([]SegmentEntry{ - { - SegmentID: 100, - NodeID: 1, - PartitionID: 500, - TargetVersion: unreadableTargetVersion, - Level: datapb.SegmentLevel_L1, - }, - { - SegmentID: 200, - NodeID: 1, - PartitionID: 500, - TargetVersion: unreadableTargetVersion, - Level: datapb.SegmentLevel_L0, - }, - }, sealed[0].Segments) + s.ErrorIs(err, merr.ErrServiceInternal) }) s.Run("load_segments_with_l0_delete_failed", func() { @@ -1500,7 +1463,7 @@ func (s *DelegatorDataSuite) TestSyncTargetVersion() { s.manager.Segment.Put(context.Background(), segments.SegmentTypeGrowing, ms) } - s.delegator.SyncTargetVersion(int64(5), []int64{1}, []int64{1}, []int64{2}, []int64{3, 4}, &msgpb.MsgPosition{}) + s.delegator.SyncTargetVersion(int64(5), []int64{1}, []int64{1}, []int64{2}, []int64{3, 4}, &msgpb.MsgPosition{}, []int64{}) s.Equal(int64(5), s.delegator.GetTargetVersion()) } @@ -1532,7 +1495,7 @@ func (s *DelegatorDataSuite) TestLevel0Deletions() { NumOfRows: 1, }) l0.LoadDeltaData(context.TODO(), partitionDeleteData) - delegator.segmentManager.Put(context.TODO(), segments.SegmentTypeSealed, l0) + delegator.deleteBuffer.RegisterL0(l0) l0Global, _ := segments.NewL0Segment(collection, segments.SegmentTypeSealed, 2, &querypb.SegmentLoadInfo{ CollectionID: 1, @@ -1550,7 +1513,7 @@ func (s *DelegatorDataSuite) TestLevel0Deletions() { pks, _ = delegator.GetLevel0Deletions(partitionID+1, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing)) s.Empty(pks) - delegator.segmentManager.Put(context.TODO(), segments.SegmentTypeSealed, l0Global) + delegator.deleteBuffer.RegisterL0(l0Global) pks, _ = delegator.GetLevel0Deletions(partitionID, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing)) rawPks := make([]storage.PrimaryKey, 0, pks.Len()) for i := 0; i < pks.Len(); i++ { @@ -1566,14 +1529,14 @@ func (s *DelegatorDataSuite) TestLevel0Deletions() { s.Equal(pks.Len(), 1) s.True(pks.Get(0).EQ(allPartitionDeleteData.DeletePks().Get(0))) - delegator.segmentManager.Remove(context.TODO(), l0.ID(), querypb.DataScope_All) + delegator.deleteBuffer.UnRegister(0, l0.ID()) pks, _ = delegator.GetLevel0Deletions(partitionID, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing)) s.True(pks.Get(0).EQ(allPartitionDeleteData.DeletePks().Get(0))) pks, _ = delegator.GetLevel0Deletions(partitionID+1, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing)) s.True(pks.Get(0).EQ(allPartitionDeleteData.DeletePks().Get(0))) - delegator.segmentManager.Remove(context.TODO(), l0Global.ID(), querypb.DataScope_All) + delegator.deleteBuffer.UnRegister(0, l0Global.ID()) pks, _ = delegator.GetLevel0Deletions(partitionID+1, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing)) s.Empty(pks) } diff --git a/internal/querynodev2/delegator/delegator_test.go b/internal/querynodev2/delegator/delegator_test.go index d7cf7cb89639f..109a9afb8d256 100644 --- a/internal/querynodev2/delegator/delegator_test.go +++ b/internal/querynodev2/delegator/delegator_test.go @@ -325,7 +325,7 @@ func (s *DelegatorSuite) initSegments() { Version: 2001, }, ) - s.delegator.SyncTargetVersion(2001, []int64{500, 501}, []int64{1004}, []int64{1000, 1001, 1002, 1003}, []int64{}, &msgpb.MsgPosition{}) + s.delegator.SyncTargetVersion(2001, []int64{500, 501}, []int64{1004}, []int64{1000, 1001, 1002, 1003}, []int64{}, &msgpb.MsgPosition{}, []int64{}) } func (s *DelegatorSuite) TestSearch() { diff --git a/internal/querynodev2/delegator/deletebuffer/delete_buffer.go b/internal/querynodev2/delegator/deletebuffer/delete_buffer.go index 91d1e8e687ef3..c0ac799552468 100644 --- a/internal/querynodev2/delegator/deletebuffer/delete_buffer.go +++ b/internal/querynodev2/delegator/deletebuffer/delete_buffer.go @@ -17,10 +17,14 @@ package deletebuffer import ( + "context" "sort" "sync" "github.com/cockroachdb/errors" + + "github.com/milvus-io/milvus/internal/querynodev2/segments" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) var errBufferFull = errors.New("buffer full") @@ -39,13 +43,24 @@ type DeleteBuffer[T timed] interface { TryDiscard(uint64) // Size returns current size information of delete buffer: entryNum and memory Size() (entryNum, memorySize int64) + + // Register L0 segment + RegisterL0(segments ...segments.Segment) + // ListAll L0 + ListL0() []segments.Segment + // Clean delete data, include l0 segment and delete buffer + UnRegister(ts uint64, segments ...int64) + + // clean up delete buffer + Clear() } func NewDoubleCacheDeleteBuffer[T timed](startTs uint64, maxSize int64) DeleteBuffer[T] { return &doubleCacheBuffer[T]{ - head: newCacheBlock[T](startTs, maxSize), - maxSize: maxSize, - ts: startTs, + head: newCacheBlock[T](startTs, maxSize), + maxSize: maxSize, + ts: startTs, + l0Segments: make([]segments.Segment, 0), } } @@ -55,6 +70,56 @@ type doubleCacheBuffer[T timed] struct { head, tail *cacheBlock[T] maxSize int64 ts uint64 + + // maintain l0 segment list + l0Segments []segments.Segment +} + +func (c *doubleCacheBuffer[T]) RegisterL0(segmentList ...segments.Segment) { + c.mut.Lock() + defer c.mut.Unlock() + + // Filter out nil segments + for _, seg := range segmentList { + if seg != nil { + c.l0Segments = append(c.l0Segments, seg) + } + } +} + +func (c *doubleCacheBuffer[T]) ListL0() []segments.Segment { + c.mut.RLock() + defer c.mut.RUnlock() + return c.l0Segments +} + +func (c *doubleCacheBuffer[T]) UnRegister(ts uint64, segmentList ...int64) { + c.mut.Lock() + defer c.mut.Unlock() + var newSegments []segments.Segment + + l0Set := typeutil.NewUniqueSet(segmentList...) + for _, s := range c.l0Segments { + if l0Set.Contain(s.ID()) { + s.Release(context.TODO()) + continue + } + newSegments = append(newSegments, s) + } + c.l0Segments = newSegments +} + +func (c *doubleCacheBuffer[T]) Clear() { + c.mut.Lock() + defer c.mut.Unlock() + + for _, s := range c.l0Segments { + s.Release(context.TODO()) + } + c.l0Segments = nil + // reset cache block + c.tail = c.head + c.head = newCacheBlock[T](c.ts, c.maxSize) } func (c *doubleCacheBuffer[T]) SafeTs() uint64 { diff --git a/internal/querynodev2/delegator/deletebuffer/delete_buffer_test.go b/internal/querynodev2/delegator/deletebuffer/delete_buffer_test.go index 44e8c26c1f1e3..112572a80aa01 100644 --- a/internal/querynodev2/delegator/deletebuffer/delete_buffer_test.go +++ b/internal/querynodev2/delegator/deletebuffer/delete_buffer_test.go @@ -20,8 +20,10 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" + "github.com/milvus-io/milvus/internal/querynodev2/segments" "github.com/milvus-io/milvus/internal/storage" ) @@ -136,6 +138,64 @@ func (s *DoubleCacheBufferSuite) TestPut() { s.EqualValues(234, memorySize) } +func (s *DoubleCacheBufferSuite) TestL0SegmentOperations() { + buffer := NewDoubleCacheDeleteBuffer[*Item](10, 1000) + + // Create mock segments with specific IDs + seg1 := segments.NewMockSegment(s.T()) + seg1.On("ID").Return(int64(1)) + seg1.On("Release", mock.Anything).Return() + + seg2 := segments.NewMockSegment(s.T()) + seg2.On("ID").Return(int64(2)) + seg2.On("Release", mock.Anything).Return() + + seg3 := segments.NewMockSegment(s.T()) + seg3.On("Release", mock.Anything).Return() + + // Test RegisterL0 with multiple segments + buffer.RegisterL0(seg1, seg2) + segments := buffer.ListL0() + s.Equal(2, len(segments)) + + // Verify segment IDs by collecting them first + ids := make([]int64, 0, len(segments)) + for _, seg := range segments { + ids = append(ids, seg.ID()) + } + s.ElementsMatch([]int64{1, 2}, ids, "expected segment IDs 1 and 2 in any order") + + // Test ListL0 with empty buffer + emptyBuffer := NewDoubleCacheDeleteBuffer[*Item](10, 1000) + s.Equal(0, len(emptyBuffer.ListL0())) + + // Test UnRegister + buffer.UnRegister(15, 1) + segments = buffer.ListL0() + s.Equal(1, len(segments)) + s.Equal(int64(2), segments[0].ID()) + + // Verify Release was called on unregistered segment + seg1.AssertCalled(s.T(), "Release", mock.Anything) + + // Test Clear + buffer.RegisterL0(seg3) + s.Equal(2, len(buffer.ListL0())) + buffer.Clear() + s.Equal(0, len(buffer.ListL0())) + + // Verify Release was called on all segments + seg2.AssertCalled(s.T(), "Release", mock.Anything) + seg3.AssertCalled(s.T(), "Release", mock.Anything) + + // Test UnRegister with non-existent segment + buffer.UnRegister(20, 999) + + // Test RegisterL0 with nil segment (should not panic) + buffer.RegisterL0(nil) + s.Equal(0, len(buffer.ListL0())) +} + func TestDoubleCacheDeleteBuffer(t *testing.T) { suite.Run(t, new(DoubleCacheBufferSuite)) } diff --git a/internal/querynodev2/delegator/deletebuffer/list_delete_buffer.go b/internal/querynodev2/delegator/deletebuffer/list_delete_buffer.go index 2a17e1ec6edb1..3c234580eaafc 100644 --- a/internal/querynodev2/delegator/deletebuffer/list_delete_buffer.go +++ b/internal/querynodev2/delegator/deletebuffer/list_delete_buffer.go @@ -17,10 +17,13 @@ package deletebuffer import ( + "context" "sync" "github.com/cockroachdb/errors" + "github.com/samber/lo" + "github.com/milvus-io/milvus/internal/querynodev2/segments" "github.com/milvus-io/milvus/pkg/metrics" ) @@ -30,6 +33,7 @@ func NewListDeleteBuffer[T timed](startTs uint64, sizePerBlock int64, labels []s sizePerBlock: sizePerBlock, list: []*cacheBlock[T]{newCacheBlock[T](startTs, sizePerBlock)}, labels: labels, + l0Segments: make([]segments.Segment, 0), } } @@ -50,6 +54,60 @@ type listDeleteBuffer[T timed] struct { // metrics labels labels []string + + // maintain l0 segment list + l0Segments []segments.Segment +} + +func (b *listDeleteBuffer[T]) RegisterL0(segmentList ...segments.Segment) { + b.mut.Lock() + defer b.mut.Unlock() + // Filter out nil segments + for _, seg := range segmentList { + if seg != nil { + b.l0Segments = append(b.l0Segments, seg) + } + } + + b.updateMetrics() +} + +func (b *listDeleteBuffer[T]) ListL0() []segments.Segment { + b.mut.RLock() + defer b.mut.RUnlock() + return b.l0Segments +} + +func (b *listDeleteBuffer[T]) UnRegister(ts uint64, segmentList ...int64) { + b.mut.Lock() + defer b.mut.Unlock() + var newSegments []segments.Segment + + for _, s := range b.l0Segments { + if !lo.Contains(segmentList, s.ID()) { + newSegments = append(newSegments, s) + } else { + s.Release(context.TODO()) + } + } + b.l0Segments = newSegments + b.tryCleanDelete(ts) + b.updateMetrics() +} + +func (b *listDeleteBuffer[T]) Clear() { + b.mut.Lock() + defer b.mut.Unlock() + + // clean l0 segments + for _, s := range b.l0Segments { + s.Release(context.TODO()) + } + b.l0Segments = nil + + // reset cache block + b.list = []*cacheBlock[T]{newCacheBlock[T](b.safeTs, b.sizePerBlock)} + b.updateMetrics() } func (b *listDeleteBuffer[T]) updateMetrics() { @@ -93,6 +151,10 @@ func (b *listDeleteBuffer[T]) SafeTs() uint64 { func (b *listDeleteBuffer[T]) TryDiscard(ts uint64) { b.mut.Lock() defer b.mut.Unlock() + b.tryCleanDelete(ts) +} + +func (b *listDeleteBuffer[T]) tryCleanDelete(ts uint64) { if len(b.list) == 1 { return } diff --git a/internal/querynodev2/delegator/deletebuffer/list_delete_buffer_test.go b/internal/querynodev2/delegator/deletebuffer/list_delete_buffer_test.go index 0123d30addc0f..6a21a835d5086 100644 --- a/internal/querynodev2/delegator/deletebuffer/list_delete_buffer_test.go +++ b/internal/querynodev2/delegator/deletebuffer/list_delete_buffer_test.go @@ -19,8 +19,10 @@ package deletebuffer import ( "testing" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" + "github.com/milvus-io/milvus/internal/querynodev2/segments" "github.com/milvus-io/milvus/internal/storage" ) @@ -126,6 +128,64 @@ func (s *ListDeleteBufferSuite) TestTryDiscard() { s.EqualValues(120, memorySize) } +func (s *ListDeleteBufferSuite) TestL0SegmentOperations() { + buffer := NewListDeleteBuffer[*Item](10, 1000, []string{"1", "dml-1"}) + + // Create mock segments with specific IDs + seg1 := segments.NewMockSegment(s.T()) + seg1.On("ID").Return(int64(1)) + seg1.On("Release", mock.Anything).Return() + + seg2 := segments.NewMockSegment(s.T()) + seg2.On("ID").Return(int64(2)) + seg2.On("Release", mock.Anything).Return() + + seg3 := segments.NewMockSegment(s.T()) + seg3.On("Release", mock.Anything).Return() + + // Test RegisterL0 with multiple segments + buffer.RegisterL0(seg1, seg2) + segments := buffer.ListL0() + s.Equal(2, len(segments)) + + // Verify segment IDs by collecting them first + ids := make([]int64, 0, len(segments)) + for _, seg := range segments { + ids = append(ids, seg.ID()) + } + s.ElementsMatch([]int64{1, 2}, ids, "expected segment IDs 1 and 2 in any order") + + // Test ListL0 with empty buffer + emptyBuffer := NewListDeleteBuffer[*Item](10, 1000, []string{}) + s.Equal(0, len(emptyBuffer.ListL0())) + + // Test UnRegister + buffer.UnRegister(15, 1) + segments = buffer.ListL0() + s.Equal(1, len(segments)) + s.Equal(int64(2), segments[0].ID()) + + // Verify Release was called on unregistered segment + seg1.AssertCalled(s.T(), "Release", mock.Anything) + + // Test Clear + buffer.RegisterL0(seg3) + s.Equal(2, len(buffer.ListL0())) + buffer.Clear() + s.Equal(0, len(buffer.ListL0())) + + // Verify Release was called on all segments + seg2.AssertCalled(s.T(), "Release", mock.Anything) + seg3.AssertCalled(s.T(), "Release", mock.Anything) + + // Test UnRegister with non-existent segment + buffer.UnRegister(20, 999) + + // Test RegisterL0 with nil segment (should not panic) + buffer.RegisterL0(nil) + s.Equal(0, len(buffer.ListL0())) +} + func TestListDeleteBuffer(t *testing.T) { suite.Run(t, new(ListDeleteBufferSuite)) } diff --git a/internal/querynodev2/delegator/delta_forward.go b/internal/querynodev2/delegator/delta_forward.go index a42353d4554d8..92c65bb9b0d95 100644 --- a/internal/querynodev2/delegator/delta_forward.go +++ b/internal/querynodev2/delegator/delta_forward.go @@ -183,9 +183,7 @@ func (sd *shardDelegator) forwardL0RemoteLoad(ctx context.Context, } func (sd *shardDelegator) getLevel0Deltalogs(partitionID int64) []*datapb.FieldBinlog { - level0Segments := sd.segmentManager.GetBy( - segments.WithLevel(datapb.SegmentLevel_L0), - segments.WithChannel(sd.vchannelName)) + level0Segments := sd.deleteBuffer.ListL0() var deltalogs []*datapb.FieldBinlog diff --git a/internal/querynodev2/delegator/delta_forward_test.go b/internal/querynodev2/delegator/delta_forward_test.go index 6dbd07710c0a3..b82b102d387b8 100644 --- a/internal/querynodev2/delegator/delta_forward_test.go +++ b/internal/querynodev2/delegator/delta_forward_test.go @@ -416,7 +416,7 @@ func (s *GrowingMergeL0Suite) TestAddL0ForGrowingBF() { } err = l0Segment.LoadDeltaData(context.Background(), deltaData) s.Require().NoError(err) - s.manager.Segment.Put(context.Background(), segments.SegmentTypeSealed, l0Segment) + s.delegator.deleteBuffer.RegisterL0(l0Segment) seg.EXPECT().ID().Return(10000) seg.EXPECT().Partition().Return(100) @@ -463,7 +463,7 @@ func (s *GrowingMergeL0Suite) TestAddL0ForGrowingLoad() { } err = l0Segment.LoadDeltaData(context.Background(), deltaData) s.Require().NoError(err) - s.manager.Segment.Put(context.Background(), segments.SegmentTypeSealed, l0Segment) + s.delegator.deleteBuffer.RegisterL0(l0Segment) seg.EXPECT().ID().Return(10000) seg.EXPECT().Partition().Return(100) diff --git a/internal/querynodev2/delegator/mock_delegator.go b/internal/querynodev2/delegator/mock_delegator.go index d61bf0a598b87..ce2221fc968d1 100644 --- a/internal/querynodev2/delegator/mock_delegator.go +++ b/internal/querynodev2/delegator/mock_delegator.go @@ -498,6 +498,54 @@ func (_c *MockShardDelegator_LoadGrowing_Call) RunAndReturn(run func(context.Con return _c } +// LoadL0 provides a mock function with given fields: ctx, infos, version +func (_m *MockShardDelegator) LoadL0(ctx context.Context, infos []*querypb.SegmentLoadInfo, version int64) error { + ret := _m.Called(ctx, infos, version) + + if len(ret) == 0 { + panic("no return value specified for LoadL0") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, []*querypb.SegmentLoadInfo, int64) error); ok { + r0 = rf(ctx, infos, version) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockShardDelegator_LoadL0_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'LoadL0' +type MockShardDelegator_LoadL0_Call struct { + *mock.Call +} + +// LoadL0 is a helper method to define mock.On call +// - ctx context.Context +// - infos []*querypb.SegmentLoadInfo +// - version int64 +func (_e *MockShardDelegator_Expecter) LoadL0(ctx interface{}, infos interface{}, version interface{}) *MockShardDelegator_LoadL0_Call { + return &MockShardDelegator_LoadL0_Call{Call: _e.mock.On("LoadL0", ctx, infos, version)} +} + +func (_c *MockShardDelegator_LoadL0_Call) Run(run func(ctx context.Context, infos []*querypb.SegmentLoadInfo, version int64)) *MockShardDelegator_LoadL0_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].([]*querypb.SegmentLoadInfo), args[2].(int64)) + }) + return _c +} + +func (_c *MockShardDelegator_LoadL0_Call) Return(_a0 error) *MockShardDelegator_LoadL0_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockShardDelegator_LoadL0_Call) RunAndReturn(run func(context.Context, []*querypb.SegmentLoadInfo, int64) error) *MockShardDelegator_LoadL0_Call { + _c.Call.Return(run) + return _c +} + // LoadSegments provides a mock function with given fields: ctx, req func (_m *MockShardDelegator) LoadSegments(ctx context.Context, req *querypb.LoadSegmentsRequest) error { ret := _m.Called(ctx, req) @@ -985,9 +1033,9 @@ func (_c *MockShardDelegator_SyncPartitionStats_Call) RunAndReturn(run func(cont return _c } -// SyncTargetVersion provides a mock function with given fields: newVersion, partitions, growingInTarget, sealedInTarget, droppedInTarget, checkpoint -func (_m *MockShardDelegator) SyncTargetVersion(newVersion int64, partitions []int64, growingInTarget []int64, sealedInTarget []int64, droppedInTarget []int64, checkpoint *msgpb.MsgPosition) { - _m.Called(newVersion, partitions, growingInTarget, sealedInTarget, droppedInTarget, checkpoint) +// SyncTargetVersion provides a mock function with given fields: newVersion, partitions, growingInTarget, sealedInTarget, droppedInTarget, checkpoint, l0InTarget +func (_m *MockShardDelegator) SyncTargetVersion(newVersion int64, partitions []int64, growingInTarget []int64, sealedInTarget []int64, droppedInTarget []int64, checkpoint *msgpb.MsgPosition, l0InTarget []int64) { + _m.Called(newVersion, partitions, growingInTarget, sealedInTarget, droppedInTarget, checkpoint, l0InTarget) } // MockShardDelegator_SyncTargetVersion_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SyncTargetVersion' @@ -1002,13 +1050,14 @@ type MockShardDelegator_SyncTargetVersion_Call struct { // - sealedInTarget []int64 // - droppedInTarget []int64 // - checkpoint *msgpb.MsgPosition -func (_e *MockShardDelegator_Expecter) SyncTargetVersion(newVersion interface{}, partitions interface{}, growingInTarget interface{}, sealedInTarget interface{}, droppedInTarget interface{}, checkpoint interface{}) *MockShardDelegator_SyncTargetVersion_Call { - return &MockShardDelegator_SyncTargetVersion_Call{Call: _e.mock.On("SyncTargetVersion", newVersion, partitions, growingInTarget, sealedInTarget, droppedInTarget, checkpoint)} +// - l0InTarget []int64 +func (_e *MockShardDelegator_Expecter) SyncTargetVersion(newVersion interface{}, partitions interface{}, growingInTarget interface{}, sealedInTarget interface{}, droppedInTarget interface{}, checkpoint interface{}, l0InTarget interface{}) *MockShardDelegator_SyncTargetVersion_Call { + return &MockShardDelegator_SyncTargetVersion_Call{Call: _e.mock.On("SyncTargetVersion", newVersion, partitions, growingInTarget, sealedInTarget, droppedInTarget, checkpoint, l0InTarget)} } -func (_c *MockShardDelegator_SyncTargetVersion_Call) Run(run func(newVersion int64, partitions []int64, growingInTarget []int64, sealedInTarget []int64, droppedInTarget []int64, checkpoint *msgpb.MsgPosition)) *MockShardDelegator_SyncTargetVersion_Call { +func (_c *MockShardDelegator_SyncTargetVersion_Call) Run(run func(newVersion int64, partitions []int64, growingInTarget []int64, sealedInTarget []int64, droppedInTarget []int64, checkpoint *msgpb.MsgPosition, l0InTarget []int64)) *MockShardDelegator_SyncTargetVersion_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(int64), args[1].([]int64), args[2].([]int64), args[3].([]int64), args[4].([]int64), args[5].(*msgpb.MsgPosition)) + run(args[0].(int64), args[1].([]int64), args[2].([]int64), args[3].([]int64), args[4].([]int64), args[5].(*msgpb.MsgPosition), args[6].([]int64)) }) return _c } @@ -1018,7 +1067,7 @@ func (_c *MockShardDelegator_SyncTargetVersion_Call) Return() *MockShardDelegato return _c } -func (_c *MockShardDelegator_SyncTargetVersion_Call) RunAndReturn(run func(int64, []int64, []int64, []int64, []int64, *msgpb.MsgPosition)) *MockShardDelegator_SyncTargetVersion_Call { +func (_c *MockShardDelegator_SyncTargetVersion_Call) RunAndReturn(run func(int64, []int64, []int64, []int64, []int64, *msgpb.MsgPosition, []int64)) *MockShardDelegator_SyncTargetVersion_Call { _c.Call.Return(run) return _c } diff --git a/internal/querynodev2/handlers.go b/internal/querynodev2/handlers.go index 817777606ad26..5f2023df1b91a 100644 --- a/internal/querynodev2/handlers.go +++ b/internal/querynodev2/handlers.go @@ -68,18 +68,7 @@ func loadL0Segments(ctx context.Context, delegator delegator.ShardDelegator, req } } - return delegator.LoadSegments(ctx, &querypb.LoadSegmentsRequest{ - Base: req.GetBase(), - DstNodeID: req.GetNodeID(), - Infos: l0Segments, - Schema: req.GetSchema(), - CollectionID: req.GetCollectionID(), - LoadMeta: req.GetLoadMeta(), - ReplicaID: req.GetReplicaID(), - Version: req.GetVersion(), - NeedTransfer: false, - IndexInfoList: req.GetIndexInfoList(), - }) + return delegator.LoadL0(ctx, l0Segments, req.GetVersion()) } func loadGrowingSegments(ctx context.Context, delegator delegator.ShardDelegator, req *querypb.WatchDmChannelsRequest) error { diff --git a/internal/querynodev2/segments/segment_loader.go b/internal/querynodev2/segments/segment_loader.go index 7d18749ceda18..ebadb5d8907c6 100644 --- a/internal/querynodev2/segments/segment_loader.go +++ b/internal/querynodev2/segments/segment_loader.go @@ -355,7 +355,9 @@ func (loader *segmentLoader) Load(ctx context.Context, return errors.Wrap(err, "At LoadDeltaLogs") } - loader.manager.Segment.Put(ctx, segmentType, segment) + if segment.Level() != datapb.SegmentLevel_L0 { + loader.manager.Segment.Put(ctx, segmentType, segment) + } newSegments.GetAndRemove(segmentID) loaded.Insert(segmentID, segment) loader.notifyLoadFinish(loadInfo) diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index c0b2ee91d05f6..e5f0af70aefc8 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -272,17 +272,10 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm defer func() { if err != nil { node.delegators.GetAndRemove(channel.GetChannelName()) + delegator.Close() } }() - // create tSafe - // node.tSafeManager.Add(ctx, channel.ChannelName, channel.GetSeekPosition().GetTimestamp()) - // defer func() { - // if err != nil { - // node.tSafeManager.Remove(ctx, channel.ChannelName) - // } - // }() - pipeline, err := node.pipelineManager.Add(req.GetCollectionID(), channel.GetChannelName()) if err != nil { msg := "failed to create pipeline" @@ -306,9 +299,6 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm // remove legacy growing node.manager.Segment.RemoveBy(ctx, segments.WithChannel(channel.GetChannelName()), segments.WithType(segments.SegmentTypeGrowing)) - // remove legacy l0 segments - node.manager.Segment.RemoveBy(ctx, segments.WithChannel(channel.GetChannelName()), - segments.WithLevel(datapb.SegmentLevel_L0)) } }() @@ -371,10 +361,7 @@ func (node *QueryNode) UnsubDmChannel(ctx context.Context, req *querypb.UnsubDmC node.pipelineManager.Remove(req.GetChannelName()) node.manager.Segment.RemoveBy(ctx, segments.WithChannel(req.GetChannelName()), segments.WithType(segments.SegmentTypeGrowing)) - _, sealed := node.manager.Segment.RemoveBy(ctx, segments.WithChannel(req.GetChannelName()), segments.WithLevel(datapb.SegmentLevel_L0)) - // node.tSafeManager.Remove(ctx, req.GetChannelName()) - - node.manager.Collection.Unref(req.GetCollectionID(), uint32(1+sealed)) + node.manager.Collection.Unref(req.GetCollectionID(), 1) } log.Info("unsubscribed channel") @@ -1316,7 +1303,7 @@ func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDi }) shardDelegator.AddExcludedSegments(flushedInfo) shardDelegator.SyncTargetVersion(action.GetTargetVersion(), req.GetLoadMeta().GetPartitionIDs(), action.GetGrowingInTarget(), - action.GetSealedInTarget(), action.GetDroppedInTarget(), action.GetCheckpoint()) + action.GetSealedInTarget(), action.GetDroppedInTarget(), action.GetCheckpoint(), action.GetL0InTarget()) case querypb.SyncType_UpdatePartitionStats: log.Info("sync update partition stats versions") shardDelegator.SyncPartitionStats(ctx, action.PartitionStatsVersions) diff --git a/internal/querynodev2/services_test.go b/internal/querynodev2/services_test.go index 9cdfe9224304b..c5a29a45b72af 100644 --- a/internal/querynodev2/services_test.go +++ b/internal/querynodev2/services_test.go @@ -543,16 +543,6 @@ func (suite *ServiceSuite) TestUnsubDmChannels_Normal() { // prepate suite.TestWatchDmChannelsInt64() - l0Segment := segments.NewMockSegment(suite.T()) - l0Segment.EXPECT().ID().Return(10000) - l0Segment.EXPECT().Collection().Return(suite.collectionID) - l0Segment.EXPECT().Level().Return(datapb.SegmentLevel_L0) - l0Segment.EXPECT().Type().Return(commonpb.SegmentState_Sealed) - l0Segment.EXPECT().Shard().Return(suite.channel) - l0Segment.EXPECT().Release(ctx).Return() - - suite.node.manager.Segment.Put(ctx, segments.SegmentTypeSealed, l0Segment) - // data req := &querypb.UnsubDmChannelRequest{ Base: &commonpb.MsgBase{ @@ -567,10 +557,6 @@ func (suite *ServiceSuite) TestUnsubDmChannels_Normal() { status, err := suite.node.UnsubDmChannel(ctx, req) suite.NoError(merr.CheckRPCCall(status, err)) - - suite.Len(suite.node.manager.Segment.GetBy( - segments.WithChannel(suite.vchannel), - segments.WithLevel(datapb.SegmentLevel_L0)), 0) } func (suite *ServiceSuite) TestUnsubDmChannels_Failed() { @@ -1417,7 +1403,7 @@ func (suite *ServiceSuite) TestSearch_Failed() { syncVersionAction := &querypb.SyncAction{ Type: querypb.SyncType_UpdateVersion, - SealedInTarget: []int64{1, 2, 3, 4}, + SealedInTarget: []int64{1, 2, 3}, TargetVersion: time.Now().UnixMilli(), } @@ -2134,7 +2120,7 @@ func (suite *ServiceSuite) TestSyncDistribution_ReleaseResultCheck() { suite.True(ok) sealedSegments, _ := delegator.GetSegmentInfo(false) // 1 level 0 + 3 sealed segments - suite.Len(sealedSegments[0].Segments, 4) + suite.Len(sealedSegments[0].Segments, 3) // data req := &querypb.SyncDistributionRequest{ @@ -2158,7 +2144,7 @@ func (suite *ServiceSuite) TestSyncDistribution_ReleaseResultCheck() { suite.NoError(err) suite.Equal(commonpb.ErrorCode_Success, status.ErrorCode) sealedSegments, _ = delegator.GetSegmentInfo(false) - suite.Len(sealedSegments[0].Segments, 3) + suite.Len(sealedSegments[0].Segments, 2) releaseAction = &querypb.SyncAction{ Type: querypb.SyncType_Remove, @@ -2172,7 +2158,7 @@ func (suite *ServiceSuite) TestSyncDistribution_ReleaseResultCheck() { suite.NoError(err) suite.Equal(commonpb.ErrorCode_Success, status.ErrorCode) sealedSegments, _ = delegator.GetSegmentInfo(false) - suite.Len(sealedSegments[0].Segments, 2) + suite.Len(sealedSegments[0].Segments, 1) } func (suite *ServiceSuite) TestSyncDistribution_Failed() { diff --git a/pkg/proto/query_coord.proto b/pkg/proto/query_coord.proto index d5fe974ee2c99..b56bec57b0017 100644 --- a/pkg/proto/query_coord.proto +++ b/pkg/proto/query_coord.proto @@ -724,6 +724,7 @@ message SyncAction { repeated int64 droppedInTarget = 10; msg.MsgPosition checkpoint = 11; map partition_stats_versions = 12; + repeated int64 l0InTarget = 13; } message SyncDistributionRequest { diff --git a/pkg/proto/querypb/query_coord.pb.go b/pkg/proto/querypb/query_coord.pb.go index 85226cb872730..3bf204e2484b0 100644 --- a/pkg/proto/querypb/query_coord.pb.go +++ b/pkg/proto/querypb/query_coord.pb.go @@ -4830,6 +4830,7 @@ type SyncAction struct { DroppedInTarget []int64 `protobuf:"varint,10,rep,packed,name=droppedInTarget,proto3" json:"droppedInTarget,omitempty"` Checkpoint *msgpb.MsgPosition `protobuf:"bytes,11,opt,name=checkpoint,proto3" json:"checkpoint,omitempty"` PartitionStatsVersions map[int64]int64 `protobuf:"bytes,12,rep,name=partition_stats_versions,json=partitionStatsVersions,proto3" json:"partition_stats_versions,omitempty" protobuf_key:"varint,1,opt,name=key,proto3" protobuf_val:"varint,2,opt,name=value,proto3"` + L0InTarget []int64 `protobuf:"varint,13,rep,packed,name=l0InTarget,proto3" json:"l0InTarget,omitempty"` } func (x *SyncAction) Reset() { @@ -4948,6 +4949,13 @@ func (x *SyncAction) GetPartitionStatsVersions() map[int64]int64 { return nil } +func (x *SyncAction) GetL0InTarget() []int64 { + if x != nil { + return x.L0InTarget + } + return nil +} + type SyncDistributionRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -8069,7 +8077,7 @@ var file_query_coord_proto_rawDesc = []byte{ 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x23, 0x2e, 0x6d, 0x69, 0x6c, 0x76, 0x75, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x6e, 0x66, - 0x6f, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x8b, 0x05, 0x0a, + 0x6f, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0xab, 0x05, 0x0a, 0x0a, 0x53, 0x79, 0x6e, 0x63, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x30, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x1c, 0x2e, 0x6d, 0x69, 0x6c, 0x76, 0x75, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x53, @@ -8105,7 +8113,9 @@ var file_query_coord_proto_rawDesc = []byte{ 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74, 0x73, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x16, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, - 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74, 0x73, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x1a, + 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74, 0x73, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x12, + 0x1e, 0x0a, 0x0a, 0x6c, 0x30, 0x49, 0x6e, 0x54, 0x61, 0x72, 0x67, 0x65, 0x74, 0x18, 0x0d, 0x20, + 0x03, 0x28, 0x03, 0x52, 0x0a, 0x6c, 0x30, 0x49, 0x6e, 0x54, 0x61, 0x72, 0x67, 0x65, 0x74, 0x1a, 0x49, 0x0a, 0x1b, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74, 0x73, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x03, 0x6b, 0x65, 0x79, diff --git a/tests/integration/balance/balance_test.go b/tests/integration/balance/balance_test.go index cf2f148ae3fad..5527a10ede6c2 100644 --- a/tests/integration/balance/balance_test.go +++ b/tests/integration/balance/balance_test.go @@ -182,7 +182,7 @@ func (s *BalanceTestSuit) TestBalanceOnSingleReplica() { resp, err := qn.GetDataDistribution(ctx, &querypb.GetDataDistributionRequest{}) s.NoError(err) s.True(merr.Ok(resp.GetStatus())) - return len(resp.Channels) == 1 && len(resp.Segments) >= 2 + return len(resp.Channels) == 1 && len(resp.Segments) == 2 }, 30*time.Second, 1*time.Second) // check total segment number and total channel number @@ -195,7 +195,7 @@ func (s *BalanceTestSuit) TestBalanceOnSingleReplica() { segNum += len(resp1.Segments) chNum += len(resp1.Channels) } - return segNum == 8 && chNum == 2 + return segNum == 4 && chNum == 2 }, 30*time.Second, 1*time.Second) } @@ -220,13 +220,13 @@ func (s *BalanceTestSuit) TestBalanceOnMultiReplica() { s.Eventually(func() bool { resp, err := qn1.GetDataDistribution(ctx, &querypb.GetDataDistributionRequest{}) s.NoError(err) - return len(resp.Channels) == 1 && len(resp.Segments) >= 2 + return len(resp.Channels) == 1 && len(resp.Segments) == 2 }, 30*time.Second, 1*time.Second) s.Eventually(func() bool { resp, err := qn2.GetDataDistribution(ctx, &querypb.GetDataDistributionRequest{}) s.NoError(err) - return len(resp.Channels) == 1 && len(resp.Segments) >= 2 + return len(resp.Channels) == 1 && len(resp.Segments) == 2 }, 30*time.Second, 1*time.Second) // check total segment number and total channel number @@ -239,7 +239,7 @@ func (s *BalanceTestSuit) TestBalanceOnMultiReplica() { segNum += len(resp1.Segments) chNum += len(resp1.Channels) } - return segNum == 16 && chNum == 4 + return segNum == 8 && chNum == 4 }, 30*time.Second, 1*time.Second) } @@ -250,12 +250,12 @@ func (s *BalanceTestSuit) TestNodeDown() { paramtable.Get().Save(paramtable.Get().QueryCoordCfg.AutoBalanceChannel.Key, "false") paramtable.Get().Save(paramtable.Get().QueryCoordCfg.EnableStoppingBalance.Key, "false") - // init collection with 3 channel, each channel has 15 segment, each segment has 2000 row - // and load it with 2 replicas on 2 nodes. + // init collection with 2 channel, each channel has 15 segment, each segment has 2000 row + // and load it with 1 replicas on 2 nodes. name := "test_balance_" + funcutil.GenRandomStr() s.initCollection(name, 1, 2, 15, 2000, 500) - // then we add 2 query node, after balance happens, expected each node have 1 channel and 2 segments + // then we add 2 query node, after balance happens, expected each node have 10 segments qn1 := s.Cluster.AddQueryNode() qn2 := s.Cluster.AddQueryNode() @@ -264,7 +264,7 @@ func (s *BalanceTestSuit) TestNodeDown() { resp, err := qn1.GetDataDistribution(ctx, &querypb.GetDataDistributionRequest{}) s.NoError(err) s.True(merr.Ok(resp.GetStatus())) - log.Info("resp", zap.Any("channel", resp.Channels), zap.Any("segments", resp.Segments)) + log.Info("resp", zap.Any("channel", resp.Channels), zap.Any("segments", len(resp.Segments))) return len(resp.Channels) == 0 && len(resp.Segments) >= 10 }, 30*time.Second, 1*time.Second) @@ -295,7 +295,7 @@ func (s *BalanceTestSuit) TestNodeDown() { s.NoError(err) s.True(merr.Ok(resp.GetStatus())) log.Info("resp", zap.Any("channel", resp.Channels), zap.Any("segments", resp.Segments)) - return len(resp.Channels) == 1 && len(resp.Segments) >= 15 + return len(resp.Channels) == 1 && len(resp.Segments) == 15 }, 30*time.Second, 1*time.Second) // expect all delegator will recover to healthy