Skip to content

Commit

Permalink
enhance: use rated logger for high frequency log in dist handler
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
weiliu1031 committed Feb 5, 2025
1 parent f0b7446 commit 41d02bf
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 23 deletions.
11 changes: 6 additions & 5 deletions internal/querycoordv2/balance/channel_level_score_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,11 +192,12 @@ func (b *ChannelLevelScoreBalancer) genSegmentPlan(ctx context.Context, br *bala
if len(nodeItemsMap) == 0 {
return nil
}
log.Info("node workload status",
zap.Int64("collectionID", replica.GetCollectionID()),
zap.Int64("replicaID", replica.GetID()),
zap.String("channelName", channelName),
zap.Stringers("nodes", lo.Values(nodeItemsMap)))

log.Ctx(ctx).WithRateGroup(fmt.Sprintf("genSegmentPlan-%d-%d", replica.GetCollectionID(), replica.GetID()), 1, 60).
RatedInfo(30, "node segment workload status",
zap.Int64("collectionID", replica.GetCollectionID()),
zap.Int64("replicaID", replica.GetID()),
zap.Stringers("nodes", lo.Values(nodeItemsMap)))

// list all segment which could be balanced, and calculate node's score
for _, node := range onlineNodes {
Expand Down
18 changes: 10 additions & 8 deletions internal/querycoordv2/balance/score_based_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,10 +565,11 @@ func (b *ScoreBasedBalancer) genSegmentPlan(ctx context.Context, br *balanceRepo
return nil
}

log.Info("node segment workload status",
zap.Int64("collectionID", replica.GetCollectionID()),
zap.Int64("replicaID", replica.GetID()),
zap.Stringers("nodes", lo.Values(nodeItemsMap)))
log.Ctx(ctx).WithRateGroup(fmt.Sprintf("genSegmentPlan-%d-%d", replica.GetCollectionID(), replica.GetID()), 1, 60).
RatedInfo(30, "node segment workload status",
zap.Int64("collectionID", replica.GetCollectionID()),
zap.Int64("replicaID", replica.GetID()),
zap.Stringers("nodes", lo.Values(nodeItemsMap)))

// list all segment which could be balanced, and calculate node's score
for _, node := range onlineNodes {
Expand Down Expand Up @@ -633,10 +634,11 @@ func (b *ScoreBasedBalancer) genChannelPlan(ctx context.Context, br *balanceRepo
return nil
}

log.Info("node channel workload status",
zap.Int64("collectionID", replica.GetCollectionID()),
zap.Int64("replicaID", replica.GetID()),
zap.Stringers("nodes", lo.Values(nodeItemsMap)))
log.Ctx(ctx).WithRateGroup(fmt.Sprintf("genSegmentPlan-%d-%d", replica.GetCollectionID(), replica.GetID()), 1, 60).
RatedInfo(30, "node channel workload status",
zap.Int64("collectionID", replica.GetCollectionID()),
zap.Int64("replicaID", replica.GetID()),
zap.Stringers("nodes", lo.Values(nodeItemsMap)))

channelDist := make(map[int64][]*meta.DmChannel)
for _, node := range onlineNodes {
Expand Down
24 changes: 14 additions & 10 deletions internal/querycoordv2/dist/dist_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,11 +246,13 @@ func (dh *distHandler) updateLeaderView(ctx context.Context, resp *querypb.GetDa
// check leader serviceable
if err := utils.CheckDelegatorDataReady(dh.nodeManager, dh.target, view, meta.CurrentTarget); err != nil {
view.UnServiceableError = err
log.Info("leader is not available due to distribution not ready",
zap.Int64("collectionID", view.CollectionID),
zap.Int64("nodeID", view.ID),
zap.String("channel", view.Channel),
zap.Error(err))
log.Ctx(ctx).
WithRateGroup(fmt.Sprintf("distHandler.updateLeaderView.%s", view.Channel), 1, 60).
RatedInfo(10, "leader is not available due to distribution not ready",
zap.Int64("collectionID", view.CollectionID),
zap.Int64("nodeID", view.ID),
zap.String("channel", view.Channel),
zap.Error(err))
continue
}

Expand All @@ -265,11 +267,13 @@ func (dh *distHandler) updateLeaderView(ctx context.Context, resp *querypb.GetDa
// make dist handler pull next distribution until all delegator is serviceable
dh.lastUpdateTs = 0
collectionsToSync.Insert(lview.Collection)
log.Info("leader is not available due to target version not ready",
zap.Int64("collectionID", view.CollectionID),
zap.Int64("nodeID", view.ID),
zap.String("channel", view.Channel),
zap.Error(err))
log.Ctx(ctx).
WithRateGroup(fmt.Sprintf("distHandler.updateLeaderView.%s", view.Channel), 1, 60).
RatedInfo(10, "leader is not available due to target version not ready",
zap.Int64("collectionID", view.CollectionID),
zap.Int64("nodeID", view.ID),
zap.String("channel", view.Channel),
zap.Error(err))
}
}

Expand Down

0 comments on commit 41d02bf

Please sign in to comment.