Skip to content

Commit

Permalink
Revert "fix: [cp24]Record active collections for l0Policy (#39217)" (#…
Browse files Browse the repository at this point in the history
…39576)

This reverts commit 79490d7.

Signed-off-by: zhenshan.cao <[email protected]>

Signed-off-by: zhenshan.cao <[email protected]>
  • Loading branch information
czs007 authored Jan 24, 2025
1 parent 6eadcac commit 08307a8
Show file tree
Hide file tree
Showing 6 changed files with 212 additions and 210 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,3 @@ internal/proto/**/*.pb.go
internal/core/src/pb/*.pb.h
internal/core/src/pb/*.pb.cc
**/legacypb/*.pb.go
pkg/streaming/proto/**/*.pb.go
187 changes: 90 additions & 97 deletions internal/datacoord/compaction_policy_l0.go
Original file line number Diff line number Diff line change
@@ -1,83 +1,121 @@
package datacoord

import (
"sync"
"time"

"github.com/samber/lo"
"go.uber.org/atomic"
"go.uber.org/zap"

"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/proto/datapb"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

type l0CompactionPolicy struct {
meta *meta
view *FullViews

activeCollections *activeCollections
emptyLoopCount *atomic.Int64
}

func newL0CompactionPolicy(meta *meta) *l0CompactionPolicy {
return &l0CompactionPolicy{
meta: meta,
activeCollections: newActiveCollections(),
meta: meta,
// donot share views with other compaction policy
view: &FullViews{collections: make(map[int64][]*SegmentView)},
emptyLoopCount: atomic.NewInt64(0),
}
}

func (policy *l0CompactionPolicy) Enable() bool {
return Params.DataCoordCfg.EnableAutoCompaction.GetAsBool()
}

// Notify policy to record the active updated(when adding a new L0 segment) collections.
func (policy *l0CompactionPolicy) OnCollectionUpdate(collectionID int64) {
policy.activeCollections.Record(collectionID)
func (policy *l0CompactionPolicy) Trigger() (map[CompactionTriggerType][]CompactionView, error) {
// support config hot refresh
events := policy.generateEventForLevelZeroViewChange()
if len(events) != 0 {
// each time when triggers a compaction, the idleTicker would reset
policy.emptyLoopCount.Store(0)
return events, nil
}
policy.emptyLoopCount.Inc()

if policy.emptyLoopCount.Load() >= 3 {
policy.emptyLoopCount.Store(0)
return policy.generateEventForLevelZeroViewIDLE(), nil
}

return make(map[CompactionTriggerType][]CompactionView), nil
}

func (policy *l0CompactionPolicy) Trigger() (events map[CompactionTriggerType][]CompactionView, err error) {
events = make(map[CompactionTriggerType][]CompactionView)
func (policy *l0CompactionPolicy) generateEventForLevelZeroViewChange() (events map[CompactionTriggerType][]CompactionView) {
latestCollSegs := policy.meta.GetCompactableSegmentGroupByCollection()
latestCollIDs := lo.Keys(latestCollSegs)
viewCollIDs := lo.Keys(policy.view.collections)

// 1. Get active collections
activeColls := policy.activeCollections.GetActiveCollections()
_, diffRemove := lo.Difference(latestCollIDs, viewCollIDs)
for _, collID := range diffRemove {
delete(policy.view.collections, collID)
}

// 2. Idle collections = all collections - active collections
missCached, idleColls := lo.Difference(activeColls, lo.Keys(latestCollSegs))
policy.activeCollections.ClearMissCached(missCached...)
refreshedL0Views := policy.RefreshLevelZeroViews(latestCollSegs)
if len(refreshedL0Views) > 0 {
events = make(map[CompactionTriggerType][]CompactionView)
events[TriggerTypeLevelZeroViewChange] = refreshedL0Views
}

idleCollsSet := typeutil.NewUniqueSet(idleColls...)
activeL0Views, idleL0Views := []CompactionView{}, []CompactionView{}
for collID, segments := range latestCollSegs {
policy.activeCollections.Read(collID)
return events
}

func (policy *l0CompactionPolicy) RefreshLevelZeroViews(latestCollSegs map[int64][]*SegmentInfo) []CompactionView {
var allRefreshedL0Veiws []CompactionView
for collID, segments := range latestCollSegs {
levelZeroSegments := lo.Filter(segments, func(info *SegmentInfo, _ int) bool {
return info.GetLevel() == datapb.SegmentLevel_L0
})
if len(levelZeroSegments) == 0 {
continue
latestL0Segments := GetViewsByInfo(levelZeroSegments...)
needRefresh, collRefreshedViews := policy.getChangedLevelZeroViews(collID, latestL0Segments)
if needRefresh {
log.Info("Refresh compaction level zero views",
zap.Int64("collectionID", collID),
zap.Strings("views", lo.Map(collRefreshedViews, func(view CompactionView, _ int) string {
return view.String()
})))
policy.view.collections[collID] = latestL0Segments
}

labelViews := policy.groupL0ViewsByPartChan(collID, GetViewsByInfo(levelZeroSegments...))
if idleCollsSet.Contain(collID) {
idleL0Views = append(idleL0Views, labelViews...)
} else {
activeL0Views = append(activeL0Views, labelViews...)
if len(collRefreshedViews) > 0 {
allRefreshedL0Veiws = append(allRefreshedL0Veiws, collRefreshedViews...)
}

}
if len(activeL0Views) > 0 {
events[TriggerTypeLevelZeroViewChange] = activeL0Views

return allRefreshedL0Veiws
}

func (policy *l0CompactionPolicy) getChangedLevelZeroViews(collID UniqueID, LevelZeroViews []*SegmentView) (needRefresh bool, refreshed []CompactionView) {
cachedViews := policy.view.GetSegmentViewBy(collID, func(v *SegmentView) bool {
return v.Level == datapb.SegmentLevel_L0
})

if len(LevelZeroViews) == 0 && len(cachedViews) != 0 {
needRefresh = true
return
}

if len(idleL0Views) > 0 {
events[TriggerTypeLevelZeroViewIDLE] = idleL0Views
latestViews := policy.groupL0ViewsByPartChan(collID, LevelZeroViews)
for _, latestView := range latestViews {
views := lo.Filter(cachedViews, func(v *SegmentView, _ int) bool {
return v.label.Equal(latestView.GetGroupLabel())
})

if !latestView.Equal(views) {
refreshed = append(refreshed, latestView)
needRefresh = true
}
}
return
}

func (policy *l0CompactionPolicy) groupL0ViewsByPartChan(collectionID UniqueID, levelZeroSegments []*SegmentView) []CompactionView {
func (policy *l0CompactionPolicy) groupL0ViewsByPartChan(collectionID UniqueID, levelZeroSegments []*SegmentView) map[string]*LevelZeroSegmentsView {
partChanView := make(map[string]*LevelZeroSegmentsView) // "part-chan" as key
for _, view := range levelZeroSegments {
key := view.label.Key()
Expand All @@ -92,71 +130,26 @@ func (policy *l0CompactionPolicy) groupL0ViewsByPartChan(collectionID UniqueID,
}
}

return lo.Map(lo.Values(partChanView), func(view *LevelZeroSegmentsView, _ int) CompactionView {
return view
})
}

type activeCollection struct {
ID int64
lastRefresh time.Time
readCount *atomic.Int64
}

func newActiveCollection(ID int64) *activeCollection {
return &activeCollection{
ID: ID,
lastRefresh: time.Now(),
readCount: atomic.NewInt64(0),
}
}

type activeCollections struct {
collections map[int64]*activeCollection
collGuard sync.RWMutex
}

func newActiveCollections() *activeCollections {
return &activeCollections{
collections: make(map[int64]*activeCollection),
}
}

func (ac *activeCollections) ClearMissCached(collectionIDs ...int64) {
ac.collGuard.Lock()
defer ac.collGuard.Unlock()
lo.ForEach(collectionIDs, func(collID int64, _ int) {
delete(ac.collections, collID)
})
}

func (ac *activeCollections) Record(collectionID int64) {
ac.collGuard.Lock()
defer ac.collGuard.Unlock()
if _, ok := ac.collections[collectionID]; !ok {
ac.collections[collectionID] = newActiveCollection(collectionID)
} else {
ac.collections[collectionID].lastRefresh = time.Now()
ac.collections[collectionID].readCount.Store(0)
}
return partChanView
}

func (ac *activeCollections) Read(collectionID int64) {
ac.collGuard.Lock()
defer ac.collGuard.Unlock()
if _, ok := ac.collections[collectionID]; ok {
ac.collections[collectionID].readCount.Inc()
if ac.collections[collectionID].readCount.Load() >= 3 &&
time.Since(ac.collections[collectionID].lastRefresh) > 3*paramtable.Get().DataCoordCfg.L0CompactionTriggerInterval.GetAsDuration(time.Second) {
log.Info("Active(of deletions) collections become idle", zap.Int64("collectionID", collectionID))
delete(ac.collections, collectionID)
func (policy *l0CompactionPolicy) generateEventForLevelZeroViewIDLE() map[CompactionTriggerType][]CompactionView {
events := make(map[CompactionTriggerType][]CompactionView, 0)
for collID := range policy.view.collections {
cachedViews := policy.view.GetSegmentViewBy(collID, func(v *SegmentView) bool {
return v.Level == datapb.SegmentLevel_L0
})
if len(cachedViews) > 0 {
log.Info("Views idle for a long time, try to trigger a TriggerTypeLevelZeroViewIDLE compaction event")
grouped := policy.groupL0ViewsByPartChan(collID, cachedViews)
events[TriggerTypeLevelZeroViewIDLE] = lo.Map(lo.Values(grouped),
func(l0View *LevelZeroSegmentsView, _ int) CompactionView {
return l0View
})
log.Info("Generate TriggerTypeLevelZeroViewIDLE compaction event", zap.Int64("collectionID", collID))
break
}
}
}

func (ac *activeCollections) GetActiveCollections() []int64 {
ac.collGuard.RLock()
defer ac.collGuard.RUnlock()

return lo.Keys(ac.collections)
return events
}
Loading

0 comments on commit 08307a8

Please sign in to comment.