Skip to content

Commit

Permalink
fix: [cherry-pick]Using zero serverID for metrics (#31519)
Browse files Browse the repository at this point in the history
Fixes: #31516
pr: #31518

Signed-off-by: yangxuan <[email protected]>
  • Loading branch information
XuanYang-cn authored Apr 2, 2024
1 parent 5bfe26f commit 3002f94
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 46 deletions.
24 changes: 14 additions & 10 deletions internal/datanode/data_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,9 @@ func (node *DataNode) initRateCollector() error {
}

func (node *DataNode) GetNodeID() int64 {
if node.serverID == 0 && node.session != nil {
return node.session.ServerID
}
return node.serverID
}

Expand All @@ -243,24 +246,25 @@ func (node *DataNode) Init() error {
return
}

node.broker = broker.NewCoordBroker(node.rootCoord, node.dataCoord, node.GetNodeID())
serverID := node.session.ServerID
log := log.Ctx(node.ctx).With(zap.String("role", typeutil.DataNodeRole), zap.Int64("nodeID", serverID))

node.broker = broker.NewCoordBroker(node.rootCoord, node.dataCoord, serverID)

err := node.initRateCollector()
if err != nil {
log.Error("DataNode server init rateCollector failed", zap.Int64("node ID", node.GetNodeID()), zap.Error(err))
log.Error("DataNode server init rateCollector failed", zap.Error(err))
initError = err
return
}
log.Info("DataNode server init rateCollector done", zap.Int64("node ID", node.GetNodeID()))
log.Info("DataNode server init rateCollector done")

node.dispClient = msgdispatcher.NewClient(node.factory, typeutil.DataNodeRole, node.GetNodeID())
log.Info("DataNode server init dispatcher client done", zap.Int64("node ID", node.GetNodeID()))
node.dispClient = msgdispatcher.NewClient(node.factory, typeutil.DataNodeRole, serverID)
log.Info("DataNode server init dispatcher client done")

alloc, err := allocator.New(context.Background(), node.rootCoord, node.GetNodeID())
alloc, err := allocator.New(context.Background(), node.rootCoord, serverID)
if err != nil {
log.Error("failed to create id allocator",
zap.Error(err),
zap.String("role", typeutil.DataNodeRole), zap.Int64("DataNode ID", node.GetNodeID()))
log.Error("failed to create id allocator", zap.Error(err))
initError = err
return
}
Expand Down Expand Up @@ -291,7 +295,7 @@ func (node *DataNode) Init() error {
node.importScheduler = importv2.NewScheduler(node.importTaskMgr, node.syncMgr, node.chunkManager)
node.channelCheckpointUpdater = newChannelCheckpointUpdater(node)

log.Info("init datanode done", zap.Int64("nodeID", node.GetNodeID()), zap.String("Address", node.address))
log.Info("init datanode done", zap.String("Address", node.address))
})
return initError
}
Expand Down
47 changes: 15 additions & 32 deletions internal/datanode/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,54 +72,37 @@ func (node *DataNode) GetComponentStates(ctx context.Context, req *milvuspb.GetC
return states, nil
}

// FlushSegments packs flush messages into flowGraph through flushChan.
//
// DataCoord calls FlushSegments if the segment is seal&flush only.
// If DataNode receives a valid segment to flush, new flush message for the segment should be ignored.
// So if receiving calls to flush segment A, DataNode should guarantee the segment to be flushed.
func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmentsRequest) (*commonpb.Status, error) {
metrics.DataNodeFlushReqCounter.WithLabelValues(
fmt.Sprint(node.GetNodeID()),
metrics.TotalLabel).Inc()
serverID := node.GetNodeID()
metrics.DataNodeFlushReqCounter.WithLabelValues(fmt.Sprint(serverID), metrics.TotalLabel).Inc()

log := log.Ctx(ctx)
log := log.Ctx(ctx).With(
zap.Int64("nodeID", serverID),
zap.Int64("collectionID", req.GetCollectionID()),
zap.String("channelName", req.GetChannelName()),
zap.Int64s("segmentIDs", req.GetSegmentIDs()),
)
log.Info("receive FlushSegments request")

if err := merr.CheckHealthy(node.GetStateCode()); err != nil {
log.Warn("DataNode.FlushSegments failed", zap.Int64("nodeId", node.GetNodeID()), zap.Error(err))
log.Warn("failed to FlushSegments", zap.Error(err))
return merr.Status(err), nil
}

serverID := node.GetSession().ServerID
if req.GetBase().GetTargetID() != serverID {
log.Warn("flush segment target id not matched",
zap.Int64("targetID", req.GetBase().GetTargetID()),
zap.Int64("serverID", serverID),
)

log.Warn("faled to FlushSegments, target node not match", zap.Int64("targetID", req.GetBase().GetTargetID()))
return merr.Status(merr.WrapErrNodeNotMatch(req.GetBase().GetTargetID(), serverID)), nil
}

segmentIDs := req.GetSegmentIDs()
log = log.With(
zap.Int64("collectionID", req.GetCollectionID()),
zap.String("channelName", req.GetChannelName()),
zap.Int64s("segmentIDs", segmentIDs),
)

log.Info("receiving FlushSegments request")

err := node.writeBufferManager.SealSegments(ctx, req.GetChannelName(), segmentIDs)
err := node.writeBufferManager.SealSegments(ctx, req.GetChannelName(), req.GetSegmentIDs())
if err != nil {
log.Warn("failed to flush segments", zap.Error(err))
log.Warn("failed to FlushSegments", zap.Error(err))
return merr.Status(err), nil
}

// Log success flushed segments.
log.Info("sending segments to WriteBuffer Manager")
log.Info("success to FlushSegments")

metrics.DataNodeFlushReqCounter.WithLabelValues(
fmt.Sprint(node.GetNodeID()),
metrics.SuccessLabel).Inc()
metrics.DataNodeFlushReqCounter.WithLabelValues(fmt.Sprint(serverID), metrics.SuccessLabel).Inc()
return merr.Success(), nil
}

Expand Down
7 changes: 3 additions & 4 deletions pkg/mq/msgdispatcher/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,9 @@ type client struct {

func NewClient(factory msgstream.Factory, role string, nodeID int64) Client {
return &client{
role: role,
nodeID: nodeID,
factory: factory,
// managers: typeutil.NewConcurrentMap[string, DispatcherManager](),
role: role,
nodeID: nodeID,
factory: factory,
managers: make(map[string]DispatcherManager),
}
}
Expand Down

0 comments on commit 3002f94

Please sign in to comment.