diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 53bb75b79a15e..50b75f0ddb541 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -228,6 +228,9 @@ func (node *DataNode) initRateCollector() error { } func (node *DataNode) GetNodeID() int64 { + if node.serverID == 0 && node.session != nil { + return node.session.ServerID + } return node.serverID } @@ -243,24 +246,25 @@ func (node *DataNode) Init() error { return } - node.broker = broker.NewCoordBroker(node.rootCoord, node.dataCoord, node.GetNodeID()) + serverID := node.session.ServerID + log := log.Ctx(node.ctx).With(zap.String("role", typeutil.DataNodeRole), zap.Int64("nodeID", serverID)) + + node.broker = broker.NewCoordBroker(node.rootCoord, node.dataCoord, serverID) err := node.initRateCollector() if err != nil { - log.Error("DataNode server init rateCollector failed", zap.Int64("node ID", node.GetNodeID()), zap.Error(err)) + log.Error("DataNode server init rateCollector failed", zap.Error(err)) initError = err return } - log.Info("DataNode server init rateCollector done", zap.Int64("node ID", node.GetNodeID())) + log.Info("DataNode server init rateCollector done") - node.dispClient = msgdispatcher.NewClient(node.factory, typeutil.DataNodeRole, node.GetNodeID()) - log.Info("DataNode server init dispatcher client done", zap.Int64("node ID", node.GetNodeID())) + node.dispClient = msgdispatcher.NewClient(node.factory, typeutil.DataNodeRole, serverID) + log.Info("DataNode server init dispatcher client done") - alloc, err := allocator.New(context.Background(), node.rootCoord, node.GetNodeID()) + alloc, err := allocator.New(context.Background(), node.rootCoord, serverID) if err != nil { - log.Error("failed to create id allocator", - zap.Error(err), - zap.String("role", typeutil.DataNodeRole), zap.Int64("DataNode ID", node.GetNodeID())) + log.Error("failed to create id allocator", zap.Error(err)) initError = err return } @@ -291,7 +295,7 @@ func (node *DataNode) Init() error { node.importScheduler = importv2.NewScheduler(node.importTaskMgr, node.syncMgr, node.chunkManager) node.channelCheckpointUpdater = newChannelCheckpointUpdater(node) - log.Info("init datanode done", zap.Int64("nodeID", node.GetNodeID()), zap.String("Address", node.address)) + log.Info("init datanode done", zap.String("Address", node.address)) }) return initError } diff --git a/internal/datanode/services.go b/internal/datanode/services.go index 57bb9eb8df54e..13868c946a743 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -72,54 +72,37 @@ func (node *DataNode) GetComponentStates(ctx context.Context, req *milvuspb.GetC return states, nil } -// FlushSegments packs flush messages into flowGraph through flushChan. -// -// DataCoord calls FlushSegments if the segment is seal&flush only. -// If DataNode receives a valid segment to flush, new flush message for the segment should be ignored. -// So if receiving calls to flush segment A, DataNode should guarantee the segment to be flushed. func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmentsRequest) (*commonpb.Status, error) { - metrics.DataNodeFlushReqCounter.WithLabelValues( - fmt.Sprint(node.GetNodeID()), - metrics.TotalLabel).Inc() + serverID := node.GetNodeID() + metrics.DataNodeFlushReqCounter.WithLabelValues(fmt.Sprint(serverID), metrics.TotalLabel).Inc() - log := log.Ctx(ctx) + log := log.Ctx(ctx).With( + zap.Int64("nodeID", serverID), + zap.Int64("collectionID", req.GetCollectionID()), + zap.String("channelName", req.GetChannelName()), + zap.Int64s("segmentIDs", req.GetSegmentIDs()), + ) + log.Info("receive FlushSegments request") if err := merr.CheckHealthy(node.GetStateCode()); err != nil { - log.Warn("DataNode.FlushSegments failed", zap.Int64("nodeId", node.GetNodeID()), zap.Error(err)) + log.Warn("failed to FlushSegments", zap.Error(err)) return merr.Status(err), nil } - serverID := node.GetSession().ServerID if req.GetBase().GetTargetID() != serverID { - log.Warn("flush segment target id not matched", - zap.Int64("targetID", req.GetBase().GetTargetID()), - zap.Int64("serverID", serverID), - ) - + log.Warn("faled to FlushSegments, target node not match", zap.Int64("targetID", req.GetBase().GetTargetID())) return merr.Status(merr.WrapErrNodeNotMatch(req.GetBase().GetTargetID(), serverID)), nil } - segmentIDs := req.GetSegmentIDs() - log = log.With( - zap.Int64("collectionID", req.GetCollectionID()), - zap.String("channelName", req.GetChannelName()), - zap.Int64s("segmentIDs", segmentIDs), - ) - - log.Info("receiving FlushSegments request") - - err := node.writeBufferManager.SealSegments(ctx, req.GetChannelName(), segmentIDs) + err := node.writeBufferManager.SealSegments(ctx, req.GetChannelName(), req.GetSegmentIDs()) if err != nil { - log.Warn("failed to flush segments", zap.Error(err)) + log.Warn("failed to FlushSegments", zap.Error(err)) return merr.Status(err), nil } - // Log success flushed segments. - log.Info("sending segments to WriteBuffer Manager") + log.Info("success to FlushSegments") - metrics.DataNodeFlushReqCounter.WithLabelValues( - fmt.Sprint(node.GetNodeID()), - metrics.SuccessLabel).Inc() + metrics.DataNodeFlushReqCounter.WithLabelValues(fmt.Sprint(serverID), metrics.SuccessLabel).Inc() return merr.Success(), nil } diff --git a/pkg/mq/msgdispatcher/client.go b/pkg/mq/msgdispatcher/client.go index 762bb1fe41a92..6b33334358eb2 100644 --- a/pkg/mq/msgdispatcher/client.go +++ b/pkg/mq/msgdispatcher/client.go @@ -53,10 +53,9 @@ type client struct { func NewClient(factory msgstream.Factory, role string, nodeID int64) Client { return &client{ - role: role, - nodeID: nodeID, - factory: factory, - // managers: typeutil.NewConcurrentMap[string, DispatcherManager](), + role: role, + nodeID: nodeID, + factory: factory, managers: make(map[string]DispatcherManager), } }