From 08307a868bfa8052160549cb0fade6724badd88f Mon Sep 17 00:00:00 2001 From: "zhenshan.cao" Date: Fri, 24 Jan 2025 17:39:06 +0800 Subject: [PATCH] Revert "fix: [cp24]Record active collections for l0Policy (#39217)" (#39576) This reverts commit 79490d7be2d811838408cb609d054dbccc14f1a9. Signed-off-by: zhenshan.cao Signed-off-by: zhenshan.cao --- .gitignore | 1 - internal/datacoord/compaction_policy_l0.go | 187 +++++++++--------- .../datacoord/compaction_policy_l0_test.go | 122 ++++++------ internal/datacoord/compaction_trigger_v2.go | 90 +++++---- .../datacoord/compaction_trigger_v2_test.go | 20 +- internal/datacoord/services.go | 2 - 6 files changed, 212 insertions(+), 210 deletions(-) diff --git a/.gitignore b/.gitignore index 041d808936c0d..b6adfcbdb4b4b 100644 --- a/.gitignore +++ b/.gitignore @@ -104,4 +104,3 @@ internal/proto/**/*.pb.go internal/core/src/pb/*.pb.h internal/core/src/pb/*.pb.cc **/legacypb/*.pb.go -pkg/streaming/proto/**/*.pb.go diff --git a/internal/datacoord/compaction_policy_l0.go b/internal/datacoord/compaction_policy_l0.go index c0fe9e66fd7d8..27df4ca29e1cb 100644 --- a/internal/datacoord/compaction_policy_l0.go +++ b/internal/datacoord/compaction_policy_l0.go @@ -1,29 +1,27 @@ package datacoord import ( - "sync" - "time" - "github.com/samber/lo" "go.uber.org/atomic" "go.uber.org/zap" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/proto/datapb" - "github.com/milvus-io/milvus/pkg/util/paramtable" - "github.com/milvus-io/milvus/pkg/util/typeutil" ) type l0CompactionPolicy struct { meta *meta + view *FullViews - activeCollections *activeCollections + emptyLoopCount *atomic.Int64 } func newL0CompactionPolicy(meta *meta) *l0CompactionPolicy { return &l0CompactionPolicy{ - meta: meta, - activeCollections: newActiveCollections(), + meta: meta, + // donot share views with other compaction policy + view: &FullViews{collections: make(map[int64][]*SegmentView)}, + emptyLoopCount: atomic.NewInt64(0), } } @@ -31,53 +29,93 @@ func (policy *l0CompactionPolicy) Enable() bool { return Params.DataCoordCfg.EnableAutoCompaction.GetAsBool() } -// Notify policy to record the active updated(when adding a new L0 segment) collections. -func (policy *l0CompactionPolicy) OnCollectionUpdate(collectionID int64) { - policy.activeCollections.Record(collectionID) +func (policy *l0CompactionPolicy) Trigger() (map[CompactionTriggerType][]CompactionView, error) { + // support config hot refresh + events := policy.generateEventForLevelZeroViewChange() + if len(events) != 0 { + // each time when triggers a compaction, the idleTicker would reset + policy.emptyLoopCount.Store(0) + return events, nil + } + policy.emptyLoopCount.Inc() + + if policy.emptyLoopCount.Load() >= 3 { + policy.emptyLoopCount.Store(0) + return policy.generateEventForLevelZeroViewIDLE(), nil + } + + return make(map[CompactionTriggerType][]CompactionView), nil } -func (policy *l0CompactionPolicy) Trigger() (events map[CompactionTriggerType][]CompactionView, err error) { - events = make(map[CompactionTriggerType][]CompactionView) +func (policy *l0CompactionPolicy) generateEventForLevelZeroViewChange() (events map[CompactionTriggerType][]CompactionView) { latestCollSegs := policy.meta.GetCompactableSegmentGroupByCollection() + latestCollIDs := lo.Keys(latestCollSegs) + viewCollIDs := lo.Keys(policy.view.collections) - // 1. Get active collections - activeColls := policy.activeCollections.GetActiveCollections() + _, diffRemove := lo.Difference(latestCollIDs, viewCollIDs) + for _, collID := range diffRemove { + delete(policy.view.collections, collID) + } - // 2. Idle collections = all collections - active collections - missCached, idleColls := lo.Difference(activeColls, lo.Keys(latestCollSegs)) - policy.activeCollections.ClearMissCached(missCached...) + refreshedL0Views := policy.RefreshLevelZeroViews(latestCollSegs) + if len(refreshedL0Views) > 0 { + events = make(map[CompactionTriggerType][]CompactionView) + events[TriggerTypeLevelZeroViewChange] = refreshedL0Views + } - idleCollsSet := typeutil.NewUniqueSet(idleColls...) - activeL0Views, idleL0Views := []CompactionView{}, []CompactionView{} - for collID, segments := range latestCollSegs { - policy.activeCollections.Read(collID) + return events +} +func (policy *l0CompactionPolicy) RefreshLevelZeroViews(latestCollSegs map[int64][]*SegmentInfo) []CompactionView { + var allRefreshedL0Veiws []CompactionView + for collID, segments := range latestCollSegs { levelZeroSegments := lo.Filter(segments, func(info *SegmentInfo, _ int) bool { return info.GetLevel() == datapb.SegmentLevel_L0 }) - if len(levelZeroSegments) == 0 { - continue + latestL0Segments := GetViewsByInfo(levelZeroSegments...) + needRefresh, collRefreshedViews := policy.getChangedLevelZeroViews(collID, latestL0Segments) + if needRefresh { + log.Info("Refresh compaction level zero views", + zap.Int64("collectionID", collID), + zap.Strings("views", lo.Map(collRefreshedViews, func(view CompactionView, _ int) string { + return view.String() + }))) + policy.view.collections[collID] = latestL0Segments } - labelViews := policy.groupL0ViewsByPartChan(collID, GetViewsByInfo(levelZeroSegments...)) - if idleCollsSet.Contain(collID) { - idleL0Views = append(idleL0Views, labelViews...) - } else { - activeL0Views = append(activeL0Views, labelViews...) + if len(collRefreshedViews) > 0 { + allRefreshedL0Veiws = append(allRefreshedL0Veiws, collRefreshedViews...) } - } - if len(activeL0Views) > 0 { - events[TriggerTypeLevelZeroViewChange] = activeL0Views + + return allRefreshedL0Veiws +} + +func (policy *l0CompactionPolicy) getChangedLevelZeroViews(collID UniqueID, LevelZeroViews []*SegmentView) (needRefresh bool, refreshed []CompactionView) { + cachedViews := policy.view.GetSegmentViewBy(collID, func(v *SegmentView) bool { + return v.Level == datapb.SegmentLevel_L0 + }) + + if len(LevelZeroViews) == 0 && len(cachedViews) != 0 { + needRefresh = true + return } - if len(idleL0Views) > 0 { - events[TriggerTypeLevelZeroViewIDLE] = idleL0Views + latestViews := policy.groupL0ViewsByPartChan(collID, LevelZeroViews) + for _, latestView := range latestViews { + views := lo.Filter(cachedViews, func(v *SegmentView, _ int) bool { + return v.label.Equal(latestView.GetGroupLabel()) + }) + + if !latestView.Equal(views) { + refreshed = append(refreshed, latestView) + needRefresh = true + } } return } -func (policy *l0CompactionPolicy) groupL0ViewsByPartChan(collectionID UniqueID, levelZeroSegments []*SegmentView) []CompactionView { +func (policy *l0CompactionPolicy) groupL0ViewsByPartChan(collectionID UniqueID, levelZeroSegments []*SegmentView) map[string]*LevelZeroSegmentsView { partChanView := make(map[string]*LevelZeroSegmentsView) // "part-chan" as key for _, view := range levelZeroSegments { key := view.label.Key() @@ -92,71 +130,26 @@ func (policy *l0CompactionPolicy) groupL0ViewsByPartChan(collectionID UniqueID, } } - return lo.Map(lo.Values(partChanView), func(view *LevelZeroSegmentsView, _ int) CompactionView { - return view - }) -} - -type activeCollection struct { - ID int64 - lastRefresh time.Time - readCount *atomic.Int64 -} - -func newActiveCollection(ID int64) *activeCollection { - return &activeCollection{ - ID: ID, - lastRefresh: time.Now(), - readCount: atomic.NewInt64(0), - } -} - -type activeCollections struct { - collections map[int64]*activeCollection - collGuard sync.RWMutex -} - -func newActiveCollections() *activeCollections { - return &activeCollections{ - collections: make(map[int64]*activeCollection), - } -} - -func (ac *activeCollections) ClearMissCached(collectionIDs ...int64) { - ac.collGuard.Lock() - defer ac.collGuard.Unlock() - lo.ForEach(collectionIDs, func(collID int64, _ int) { - delete(ac.collections, collID) - }) -} - -func (ac *activeCollections) Record(collectionID int64) { - ac.collGuard.Lock() - defer ac.collGuard.Unlock() - if _, ok := ac.collections[collectionID]; !ok { - ac.collections[collectionID] = newActiveCollection(collectionID) - } else { - ac.collections[collectionID].lastRefresh = time.Now() - ac.collections[collectionID].readCount.Store(0) - } + return partChanView } -func (ac *activeCollections) Read(collectionID int64) { - ac.collGuard.Lock() - defer ac.collGuard.Unlock() - if _, ok := ac.collections[collectionID]; ok { - ac.collections[collectionID].readCount.Inc() - if ac.collections[collectionID].readCount.Load() >= 3 && - time.Since(ac.collections[collectionID].lastRefresh) > 3*paramtable.Get().DataCoordCfg.L0CompactionTriggerInterval.GetAsDuration(time.Second) { - log.Info("Active(of deletions) collections become idle", zap.Int64("collectionID", collectionID)) - delete(ac.collections, collectionID) +func (policy *l0CompactionPolicy) generateEventForLevelZeroViewIDLE() map[CompactionTriggerType][]CompactionView { + events := make(map[CompactionTriggerType][]CompactionView, 0) + for collID := range policy.view.collections { + cachedViews := policy.view.GetSegmentViewBy(collID, func(v *SegmentView) bool { + return v.Level == datapb.SegmentLevel_L0 + }) + if len(cachedViews) > 0 { + log.Info("Views idle for a long time, try to trigger a TriggerTypeLevelZeroViewIDLE compaction event") + grouped := policy.groupL0ViewsByPartChan(collID, cachedViews) + events[TriggerTypeLevelZeroViewIDLE] = lo.Map(lo.Values(grouped), + func(l0View *LevelZeroSegmentsView, _ int) CompactionView { + return l0View + }) + log.Info("Generate TriggerTypeLevelZeroViewIDLE compaction event", zap.Int64("collectionID", collID)) + break } } -} - -func (ac *activeCollections) GetActiveCollections() []int64 { - ac.collGuard.RLock() - defer ac.collGuard.RUnlock() - return lo.Keys(ac.collections) + return events } diff --git a/internal/datacoord/compaction_policy_l0_test.go b/internal/datacoord/compaction_policy_l0_test.go index 94d66d1fce1db..b4c7b1644d8fa 100644 --- a/internal/datacoord/compaction_policy_l0_test.go +++ b/internal/datacoord/compaction_policy_l0_test.go @@ -17,7 +17,6 @@ package datacoord import ( "testing" - "time" "github.com/stretchr/testify/suite" "go.uber.org/zap" @@ -26,7 +25,6 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/proto/datapb" - "github.com/milvus-io/milvus/pkg/util/paramtable" ) func TestL0CompactionPolicySuite(t *testing.T) { @@ -45,77 +43,50 @@ type L0CompactionPolicySuite struct { l0_policy *l0CompactionPolicy } -func (s *L0CompactionPolicySuite) SetupTest() { - s.testLabel = &CompactionGroupLabel{ - CollectionID: 1, - PartitionID: 10, - Channel: "ch-1", - } - - segments := genSegmentsForMeta(s.testLabel) - meta := &meta{segments: NewSegmentsInfo()} - for id, segment := range segments { - meta.segments.SetSegment(id, segment) - } - - s.l0_policy = newL0CompactionPolicy(meta) -} - const MB = 1024 * 1024 -func (s *L0CompactionPolicySuite) TestActiveToIdle() { - paramtable.Get().Save(paramtable.Get().DataCoordCfg.L0CompactionTriggerInterval.Key, "1") - defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.L0CompactionTriggerInterval.Key) +func (s *L0CompactionPolicySuite) TestTrigger() { + s.Require().Empty(s.l0_policy.view.collections) - s.l0_policy.OnCollectionUpdate(1) - s.Require().EqualValues(1, s.l0_policy.activeCollections.GetActiveCollections()[0]) + events, err := s.l0_policy.Trigger() + s.NoError(err) + gotViews, ok := events[TriggerTypeLevelZeroViewChange] + s.True(ok) + s.NotNil(gotViews) + s.Equal(1, len(gotViews)) - <-time.After(3 * time.Second) + cView := gotViews[0] + s.Equal(s.testLabel, cView.GetGroupLabel()) + s.Equal(4, len(cView.GetSegmentsView())) + for _, view := range cView.GetSegmentsView() { + s.Equal(datapb.SegmentLevel_L0, view.Level) + } + log.Info("cView", zap.String("string", cView.String())) - for i := 0; i < 3; i++ { - gotViews, err := s.l0_policy.Trigger() + // Test for idle trigger + for i := 0; i < 2; i++ { + events, err = s.l0_policy.Trigger() s.NoError(err) - s.NotNil(gotViews) - s.NotEmpty(gotViews) - _, ok := gotViews[TriggerTypeLevelZeroViewChange] - s.True(ok) + s.Equal(0, len(events)) } + s.EqualValues(2, s.l0_policy.emptyLoopCount.Load()) - s.Empty(s.l0_policy.activeCollections.GetActiveCollections()) - gotViews, err := s.l0_policy.Trigger() + events, err = s.l0_policy.Trigger() s.NoError(err) - s.NotNil(gotViews) - s.NotEmpty(gotViews) - _, ok := gotViews[TriggerTypeLevelZeroViewIDLE] - s.True(ok) -} - -func (s *L0CompactionPolicySuite) TestTriggerIdle() { - s.Require().Empty(s.l0_policy.activeCollections.GetActiveCollections()) - - events, err := s.l0_policy.Trigger() - s.NoError(err) - s.NotEmpty(events) - - gotViews, ok := events[TriggerTypeLevelZeroViewChange] - s.False(ok) - s.Empty(gotViews) - + s.EqualValues(0, s.l0_policy.emptyLoopCount.Load()) + s.Equal(1, len(events)) gotViews, ok = events[TriggerTypeLevelZeroViewIDLE] s.True(ok) s.NotNil(gotViews) s.Equal(1, len(gotViews)) - - cView := gotViews[0] + cView = gotViews[0] s.Equal(s.testLabel, cView.GetGroupLabel()) s.Equal(4, len(cView.GetSegmentsView())) for _, view := range cView.GetSegmentsView() { s.Equal(datapb.SegmentLevel_L0, view.Level) } log.Info("cView", zap.String("string", cView.String())) -} -func (s *L0CompactionPolicySuite) TestTriggerViewChange() { segArgs := []struct { ID UniqueID PosT Timestamp @@ -142,17 +113,34 @@ func (s *L0CompactionPolicySuite) TestTriggerViewChange() { } s.l0_policy.meta = meta - s.l0_policy.OnCollectionUpdate(s.testLabel.CollectionID) - events, err := s.l0_policy.Trigger() + events, err = s.l0_policy.Trigger() s.NoError(err) - s.Equal(1, len(events)) + gotViews, ok = events[TriggerTypeLevelZeroViewChange] + s.True(ok) + s.Equal(1, len(gotViews)) +} + +func (s *L0CompactionPolicySuite) TestGenerateEventForLevelZeroViewChange() { + s.Require().Empty(s.l0_policy.view.collections) + + events := s.l0_policy.generateEventForLevelZeroViewChange() + s.NotEmpty(events) + s.NotEmpty(s.l0_policy.view.collections) + gotViews, ok := events[TriggerTypeLevelZeroViewChange] s.True(ok) + s.NotNil(gotViews) s.Equal(1, len(gotViews)) - gotViews, ok = events[TriggerTypeLevelZeroViewIDLE] - s.False(ok) - s.Empty(gotViews) + storedViews, ok := s.l0_policy.view.collections[s.testLabel.CollectionID] + s.True(ok) + s.NotNil(storedViews) + s.Equal(4, len(storedViews)) + + for _, view := range storedViews { + s.Equal(s.testLabel, view.label) + s.Equal(datapb.SegmentLevel_L0, view.Level) + } } func genSegmentsForMeta(label *CompactionGroupLabel) map[int64]*SegmentInfo { @@ -196,6 +184,22 @@ func genSegmentsForMeta(label *CompactionGroupLabel) map[int64]*SegmentInfo { return segments } +func (s *L0CompactionPolicySuite) SetupTest() { + s.testLabel = &CompactionGroupLabel{ + CollectionID: 1, + PartitionID: 10, + Channel: "ch-1", + } + + segments := genSegmentsForMeta(s.testLabel) + meta := &meta{segments: NewSegmentsInfo()} + for id, segment := range segments { + meta.segments.SetSegment(id, segment) + } + + s.l0_policy = newL0CompactionPolicy(meta) +} + func genTestSegmentInfo(label *CompactionGroupLabel, ID UniqueID, level datapb.SegmentLevel, state commonpb.SegmentState) *SegmentInfo { return &SegmentInfo{ SegmentInfo: &datapb.SegmentInfo{ diff --git a/internal/datacoord/compaction_trigger_v2.go b/internal/datacoord/compaction_trigger_v2.go index fd4f56677ea79..e09189cb6a960 100644 --- a/internal/datacoord/compaction_trigger_v2.go +++ b/internal/datacoord/compaction_trigger_v2.go @@ -26,6 +26,7 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/proto/datapb" + "github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/logutil" ) @@ -39,27 +40,9 @@ const ( TriggerTypeSingle ) -func (t CompactionTriggerType) String() string { - switch t { - case TriggerTypeLevelZeroViewChange: - return "LevelZeroViewChange" - case TriggerTypeLevelZeroViewIDLE: - return "LevelZeroViewIDLE" - case TriggerTypeSegmentSizeViewChange: - return "SegmentSizeViewChange" - case TriggerTypeClustering: - return "Clustering" - case TriggerTypeSingle: - return "Single" - default: - return "" - } -} - type TriggerManager interface { Start() Stop() - OnCollectionUpdate(collectionID int64) ManualTrigger(ctx context.Context, collectionID int64, clusteringCompaction bool) (UniqueID, error) } @@ -78,6 +61,10 @@ type CompactionTriggerManager struct { handler Handler allocator allocator + view *FullViews + // todo handle this lock + viewGuard lock.RWMutex + meta *meta l0Policy *l0CompactionPolicy clusteringPolicy *clusteringCompactionPolicy @@ -92,8 +79,11 @@ func NewCompactionTriggerManager(alloc allocator, handler Handler, compactionHan allocator: alloc, handler: handler, compactionHandler: compactionHandler, - meta: meta, - closeSig: make(chan struct{}), + view: &FullViews{ + collections: make(map[int64][]*SegmentView), + }, + meta: meta, + closeSig: make(chan struct{}), } m.l0Policy = newL0CompactionPolicy(meta) m.clusteringPolicy = newClusteringCompactionPolicy(meta, m.allocator, m.handler) @@ -101,12 +91,6 @@ func NewCompactionTriggerManager(alloc allocator, handler Handler, compactionHan return m } -// OnCollectionUpdate notifies L0Policy about latest collection's L0 segment changes -// This tells the l0 triggers about which collections are active -func (m *CompactionTriggerManager) OnCollectionUpdate(collectionID int64) { - m.l0Policy.OnCollectionUpdate(collectionID) -} - func (m *CompactionTriggerManager) Start() { m.closeWg.Add(1) go m.startLoop() @@ -211,27 +195,47 @@ func (m *CompactionTriggerManager) ManualTrigger(ctx context.Context, collection } func (m *CompactionTriggerManager) notify(ctx context.Context, eventType CompactionTriggerType, views []CompactionView) { - log := log.Ctx(ctx) - log.Debug("Start to trigger compactions", zap.String("eventType", eventType.String())) for _, view := range views { - outView, reason := view.Trigger() - if outView == nil && eventType == TriggerTypeLevelZeroViewIDLE { - log.Info("Start to force trigger a level zero compaction") - outView, reason = view.ForceTrigger() - } - - if outView != nil { - log.Info("Success to trigger a compaction, try to submit", - zap.String("eventType", eventType.String()), - zap.String("reason", reason), - zap.String("output view", outView.String())) + switch eventType { + case TriggerTypeLevelZeroViewChange: + log.Debug("Start to trigger a level zero compaction by TriggerTypeLevelZeroViewChange") + outView, reason := view.Trigger() + if outView != nil { + log.Info("Success to trigger a LevelZeroCompaction output view, try to submit", + zap.String("reason", reason), + zap.String("output view", outView.String())) + m.SubmitL0ViewToScheduler(ctx, outView) + } + case TriggerTypeLevelZeroViewIDLE: + log.Debug("Start to trigger a level zero compaction by TriggerTypLevelZeroViewIDLE") + outView, reason := view.Trigger() + if outView == nil { + log.Info("Start to force trigger a level zero compaction by TriggerTypLevelZeroViewIDLE") + outView, reason = view.ForceTrigger() + } - switch eventType { - case TriggerTypeLevelZeroViewChange, TriggerTypeLevelZeroViewIDLE: + if outView != nil { + log.Info("Success to trigger a LevelZeroCompaction output view, try to submit", + zap.String("reason", reason), + zap.String("output view", outView.String())) m.SubmitL0ViewToScheduler(ctx, outView) - case TriggerTypeClustering: + } + case TriggerTypeClustering: + log.Debug("Start to trigger a clustering compaction by TriggerTypeClustering") + outView, reason := view.Trigger() + if outView != nil { + log.Info("Success to trigger a ClusteringCompaction output view, try to submit", + zap.String("reason", reason), + zap.String("output view", outView.String())) m.SubmitClusteringViewToScheduler(ctx, outView) - case TriggerTypeSingle: + } + case TriggerTypeSingle: + log.Debug("Start to trigger a single compaction by TriggerTypeSingle") + outView, reason := view.Trigger() + if outView != nil { + log.Info("Success to trigger a L2SingleCompaction output view, try to submit", + zap.String("reason", reason), + zap.String("output view", outView.String())) m.SubmitSingleViewToScheduler(ctx, outView) } } diff --git a/internal/datacoord/compaction_trigger_v2_test.go b/internal/datacoord/compaction_trigger_v2_test.go index f109a440e6e13..cad73d40dc82c 100644 --- a/internal/datacoord/compaction_trigger_v2_test.go +++ b/internal/datacoord/compaction_trigger_v2_test.go @@ -75,9 +75,10 @@ func (s *CompactionTriggerManagerSuite) TestNotifyByViewIDLE() { expectedSegID := seg1.ID s.Require().Equal(1, len(latestL0Segments)) - levelZeroViews := s.triggerManager.l0Policy.groupL0ViewsByPartChan(1, latestL0Segments) - s.Require().Equal(1, len(levelZeroViews)) - cView, ok := levelZeroViews[0].(*LevelZeroSegmentsView) + needRefresh, levelZeroView := s.triggerManager.l0Policy.getChangedLevelZeroViews(1, latestL0Segments) + s.True(needRefresh) + s.Require().Equal(1, len(levelZeroView)) + cView, ok := levelZeroView[0].(*LevelZeroSegmentsView) s.True(ok) s.NotNil(cView) log.Info("view", zap.Any("cView", cView)) @@ -100,7 +101,7 @@ func (s *CompactionTriggerManagerSuite) TestNotifyByViewIDLE() { return nil }).Return(nil).Once() s.mockAlloc.EXPECT().allocID(mock.Anything).Return(19530, nil).Maybe() - s.triggerManager.notify(context.Background(), TriggerTypeLevelZeroViewIDLE, levelZeroViews) + s.triggerManager.notify(context.Background(), TriggerTypeLevelZeroViewIDLE, levelZeroView) } func (s *CompactionTriggerManagerSuite) TestNotifyByViewChange() { @@ -118,9 +119,10 @@ func (s *CompactionTriggerManagerSuite) TestNotifyByViewChange() { latestL0Segments := GetViewsByInfo(levelZeroSegments...) s.Require().NotEmpty(latestL0Segments) - levelZeroViews := s.triggerManager.l0Policy.groupL0ViewsByPartChan(1, latestL0Segments) - s.Require().Equal(1, len(levelZeroViews)) - cView, ok := levelZeroViews[0].(*LevelZeroSegmentsView) + needRefresh, levelZeroView := s.triggerManager.l0Policy.getChangedLevelZeroViews(1, latestL0Segments) + s.Require().True(needRefresh) + s.Require().Equal(1, len(levelZeroView)) + cView, ok := levelZeroView[0].(*LevelZeroSegmentsView) s.True(ok) s.NotNil(cView) log.Info("view", zap.Any("cView", cView)) @@ -129,6 +131,8 @@ func (s *CompactionTriggerManagerSuite) TestNotifyByViewChange() { s.mockPlanContext.EXPECT().enqueueCompaction(mock.Anything). RunAndReturn(func(task *datapb.CompactionTask) error { s.EqualValues(19530, task.GetTriggerID()) + // s.True(signal.isGlobal) + // s.False(signal.isForce) s.EqualValues(30000, task.GetPos().GetTimestamp()) s.Equal(s.testLabel.CollectionID, task.GetCollectionID()) s.Equal(s.testLabel.PartitionID, task.GetPartitionID()) @@ -140,7 +144,7 @@ func (s *CompactionTriggerManagerSuite) TestNotifyByViewChange() { return nil }).Return(nil).Once() s.mockAlloc.EXPECT().allocID(mock.Anything).Return(19530, nil).Maybe() - s.triggerManager.notify(context.Background(), TriggerTypeLevelZeroViewChange, levelZeroViews) + s.triggerManager.notify(context.Background(), TriggerTypeLevelZeroViewChange, levelZeroView) } func (s *CompactionTriggerManagerSuite) TestGetExpectedSegmentSize() { diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 362f6f6b8f864..4121438981511 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -541,8 +541,6 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath if req.GetSegLevel() == datapb.SegmentLevel_L0 { metrics.DataCoordSizeStoredL0Segment.WithLabelValues(fmt.Sprint(req.GetCollectionID())).Observe(calculateL0SegmentSize(req.GetField2StatslogPaths())) - - s.compactionTriggerManager.OnCollectionUpdate(req.GetCollectionID()) return merr.Success(), nil }