Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "fix: [cp24]Record active collections for l0Policy (#39217)" #39576

Merged
merged 1 commit into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,3 @@ internal/proto/**/*.pb.go
internal/core/src/pb/*.pb.h
internal/core/src/pb/*.pb.cc
**/legacypb/*.pb.go
pkg/streaming/proto/**/*.pb.go
187 changes: 90 additions & 97 deletions internal/datacoord/compaction_policy_l0.go
Original file line number Diff line number Diff line change
@@ -1,83 +1,121 @@
package datacoord

import (
"sync"
"time"

"github.com/samber/lo"
"go.uber.org/atomic"
"go.uber.org/zap"

"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/proto/datapb"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

type l0CompactionPolicy struct {
meta *meta
view *FullViews

activeCollections *activeCollections
emptyLoopCount *atomic.Int64
}

func newL0CompactionPolicy(meta *meta) *l0CompactionPolicy {
return &l0CompactionPolicy{
meta: meta,
activeCollections: newActiveCollections(),
meta: meta,
// donot share views with other compaction policy
view: &FullViews{collections: make(map[int64][]*SegmentView)},
emptyLoopCount: atomic.NewInt64(0),
}
}

func (policy *l0CompactionPolicy) Enable() bool {
return Params.DataCoordCfg.EnableAutoCompaction.GetAsBool()
}

// Notify policy to record the active updated(when adding a new L0 segment) collections.
func (policy *l0CompactionPolicy) OnCollectionUpdate(collectionID int64) {
policy.activeCollections.Record(collectionID)
func (policy *l0CompactionPolicy) Trigger() (map[CompactionTriggerType][]CompactionView, error) {
// support config hot refresh
events := policy.generateEventForLevelZeroViewChange()
if len(events) != 0 {
// each time when triggers a compaction, the idleTicker would reset
policy.emptyLoopCount.Store(0)
return events, nil
}
policy.emptyLoopCount.Inc()

if policy.emptyLoopCount.Load() >= 3 {
policy.emptyLoopCount.Store(0)
return policy.generateEventForLevelZeroViewIDLE(), nil
}

return make(map[CompactionTriggerType][]CompactionView), nil
}

func (policy *l0CompactionPolicy) Trigger() (events map[CompactionTriggerType][]CompactionView, err error) {
events = make(map[CompactionTriggerType][]CompactionView)
func (policy *l0CompactionPolicy) generateEventForLevelZeroViewChange() (events map[CompactionTriggerType][]CompactionView) {
latestCollSegs := policy.meta.GetCompactableSegmentGroupByCollection()
latestCollIDs := lo.Keys(latestCollSegs)
viewCollIDs := lo.Keys(policy.view.collections)

// 1. Get active collections
activeColls := policy.activeCollections.GetActiveCollections()
_, diffRemove := lo.Difference(latestCollIDs, viewCollIDs)
for _, collID := range diffRemove {
delete(policy.view.collections, collID)
}

// 2. Idle collections = all collections - active collections
missCached, idleColls := lo.Difference(activeColls, lo.Keys(latestCollSegs))
policy.activeCollections.ClearMissCached(missCached...)
refreshedL0Views := policy.RefreshLevelZeroViews(latestCollSegs)
if len(refreshedL0Views) > 0 {
events = make(map[CompactionTriggerType][]CompactionView)
events[TriggerTypeLevelZeroViewChange] = refreshedL0Views
}

idleCollsSet := typeutil.NewUniqueSet(idleColls...)
activeL0Views, idleL0Views := []CompactionView{}, []CompactionView{}
for collID, segments := range latestCollSegs {
policy.activeCollections.Read(collID)
return events
}

func (policy *l0CompactionPolicy) RefreshLevelZeroViews(latestCollSegs map[int64][]*SegmentInfo) []CompactionView {
var allRefreshedL0Veiws []CompactionView
for collID, segments := range latestCollSegs {
levelZeroSegments := lo.Filter(segments, func(info *SegmentInfo, _ int) bool {
return info.GetLevel() == datapb.SegmentLevel_L0
})
if len(levelZeroSegments) == 0 {
continue
latestL0Segments := GetViewsByInfo(levelZeroSegments...)
needRefresh, collRefreshedViews := policy.getChangedLevelZeroViews(collID, latestL0Segments)
if needRefresh {
log.Info("Refresh compaction level zero views",
zap.Int64("collectionID", collID),
zap.Strings("views", lo.Map(collRefreshedViews, func(view CompactionView, _ int) string {
return view.String()
})))
policy.view.collections[collID] = latestL0Segments
}

labelViews := policy.groupL0ViewsByPartChan(collID, GetViewsByInfo(levelZeroSegments...))
if idleCollsSet.Contain(collID) {
idleL0Views = append(idleL0Views, labelViews...)
} else {
activeL0Views = append(activeL0Views, labelViews...)
if len(collRefreshedViews) > 0 {
allRefreshedL0Veiws = append(allRefreshedL0Veiws, collRefreshedViews...)
}

}
if len(activeL0Views) > 0 {
events[TriggerTypeLevelZeroViewChange] = activeL0Views

return allRefreshedL0Veiws
}

func (policy *l0CompactionPolicy) getChangedLevelZeroViews(collID UniqueID, LevelZeroViews []*SegmentView) (needRefresh bool, refreshed []CompactionView) {
cachedViews := policy.view.GetSegmentViewBy(collID, func(v *SegmentView) bool {
return v.Level == datapb.SegmentLevel_L0
})

if len(LevelZeroViews) == 0 && len(cachedViews) != 0 {
needRefresh = true
return
}

if len(idleL0Views) > 0 {
events[TriggerTypeLevelZeroViewIDLE] = idleL0Views
latestViews := policy.groupL0ViewsByPartChan(collID, LevelZeroViews)
for _, latestView := range latestViews {
views := lo.Filter(cachedViews, func(v *SegmentView, _ int) bool {
return v.label.Equal(latestView.GetGroupLabel())
})

if !latestView.Equal(views) {
refreshed = append(refreshed, latestView)
needRefresh = true
}
}
return
}

func (policy *l0CompactionPolicy) groupL0ViewsByPartChan(collectionID UniqueID, levelZeroSegments []*SegmentView) []CompactionView {
func (policy *l0CompactionPolicy) groupL0ViewsByPartChan(collectionID UniqueID, levelZeroSegments []*SegmentView) map[string]*LevelZeroSegmentsView {
partChanView := make(map[string]*LevelZeroSegmentsView) // "part-chan" as key
for _, view := range levelZeroSegments {
key := view.label.Key()
Expand All @@ -92,71 +130,26 @@ func (policy *l0CompactionPolicy) groupL0ViewsByPartChan(collectionID UniqueID,
}
}

return lo.Map(lo.Values(partChanView), func(view *LevelZeroSegmentsView, _ int) CompactionView {
return view
})
}

type activeCollection struct {
ID int64
lastRefresh time.Time
readCount *atomic.Int64
}

func newActiveCollection(ID int64) *activeCollection {
return &activeCollection{
ID: ID,
lastRefresh: time.Now(),
readCount: atomic.NewInt64(0),
}
}

type activeCollections struct {
collections map[int64]*activeCollection
collGuard sync.RWMutex
}

func newActiveCollections() *activeCollections {
return &activeCollections{
collections: make(map[int64]*activeCollection),
}
}

func (ac *activeCollections) ClearMissCached(collectionIDs ...int64) {
ac.collGuard.Lock()
defer ac.collGuard.Unlock()
lo.ForEach(collectionIDs, func(collID int64, _ int) {
delete(ac.collections, collID)
})
}

func (ac *activeCollections) Record(collectionID int64) {
ac.collGuard.Lock()
defer ac.collGuard.Unlock()
if _, ok := ac.collections[collectionID]; !ok {
ac.collections[collectionID] = newActiveCollection(collectionID)
} else {
ac.collections[collectionID].lastRefresh = time.Now()
ac.collections[collectionID].readCount.Store(0)
}
return partChanView
}

func (ac *activeCollections) Read(collectionID int64) {
ac.collGuard.Lock()
defer ac.collGuard.Unlock()
if _, ok := ac.collections[collectionID]; ok {
ac.collections[collectionID].readCount.Inc()
if ac.collections[collectionID].readCount.Load() >= 3 &&
time.Since(ac.collections[collectionID].lastRefresh) > 3*paramtable.Get().DataCoordCfg.L0CompactionTriggerInterval.GetAsDuration(time.Second) {
log.Info("Active(of deletions) collections become idle", zap.Int64("collectionID", collectionID))
delete(ac.collections, collectionID)
func (policy *l0CompactionPolicy) generateEventForLevelZeroViewIDLE() map[CompactionTriggerType][]CompactionView {
events := make(map[CompactionTriggerType][]CompactionView, 0)
for collID := range policy.view.collections {
cachedViews := policy.view.GetSegmentViewBy(collID, func(v *SegmentView) bool {
return v.Level == datapb.SegmentLevel_L0
})
if len(cachedViews) > 0 {
log.Info("Views idle for a long time, try to trigger a TriggerTypeLevelZeroViewIDLE compaction event")
grouped := policy.groupL0ViewsByPartChan(collID, cachedViews)
events[TriggerTypeLevelZeroViewIDLE] = lo.Map(lo.Values(grouped),
func(l0View *LevelZeroSegmentsView, _ int) CompactionView {
return l0View
})
log.Info("Generate TriggerTypeLevelZeroViewIDLE compaction event", zap.Int64("collectionID", collID))
break
}
}
}

func (ac *activeCollections) GetActiveCollections() []int64 {
ac.collGuard.RLock()
defer ac.collGuard.RUnlock()

return lo.Keys(ac.collections)
return events
}
Loading
Loading