Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: [cherry-pick]Using zero serverID for metrics #31519

Merged
merged 1 commit into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 14 additions & 10 deletions internal/datanode/data_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,9 @@
}

func (node *DataNode) GetNodeID() int64 {
if node.serverID == 0 && node.session != nil {
return node.session.ServerID
}
return node.serverID
}

Expand All @@ -242,24 +245,25 @@
return
}

node.broker = broker.NewCoordBroker(node.rootCoord, node.dataCoord, node.GetNodeID())
serverID := node.session.ServerID
log := log.Ctx(node.ctx).With(zap.String("role", typeutil.DataNodeRole), zap.Int64("nodeID", serverID))

node.broker = broker.NewCoordBroker(node.rootCoord, node.dataCoord, serverID)

err := node.initRateCollector()
if err != nil {
log.Error("DataNode server init rateCollector failed", zap.Int64("node ID", node.GetNodeID()), zap.Error(err))
log.Error("DataNode server init rateCollector failed", zap.Error(err))

Check warning on line 255 in internal/datanode/data_node.go

View check run for this annotation

Codecov / codecov/patch

internal/datanode/data_node.go#L255

Added line #L255 was not covered by tests
initError = err
return
}
log.Info("DataNode server init rateCollector done", zap.Int64("node ID", node.GetNodeID()))
log.Info("DataNode server init rateCollector done")

node.dispClient = msgdispatcher.NewClient(node.factory, typeutil.DataNodeRole, node.GetNodeID())
log.Info("DataNode server init dispatcher client done", zap.Int64("node ID", node.GetNodeID()))
node.dispClient = msgdispatcher.NewClient(node.factory, typeutil.DataNodeRole, serverID)
log.Info("DataNode server init dispatcher client done")

alloc, err := allocator.New(context.Background(), node.rootCoord, node.GetNodeID())
alloc, err := allocator.New(context.Background(), node.rootCoord, serverID)
if err != nil {
log.Error("failed to create id allocator",
zap.Error(err),
zap.String("role", typeutil.DataNodeRole), zap.Int64("DataNode ID", node.GetNodeID()))
log.Error("failed to create id allocator", zap.Error(err))

Check warning on line 266 in internal/datanode/data_node.go

View check run for this annotation

Codecov / codecov/patch

internal/datanode/data_node.go#L266

Added line #L266 was not covered by tests
initError = err
return
}
Expand Down Expand Up @@ -290,7 +294,7 @@

node.channelCheckpointUpdater = newChannelCheckpointUpdater(node)

log.Info("init datanode done", zap.Int64("nodeID", node.GetNodeID()), zap.String("Address", node.address))
log.Info("init datanode done", zap.String("Address", node.address))
})
return initError
}
Expand Down
47 changes: 15 additions & 32 deletions internal/datanode/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,54 +72,37 @@ func (node *DataNode) GetComponentStates(ctx context.Context, req *milvuspb.GetC
return states, nil
}

// FlushSegments packs flush messages into flowGraph through flushChan.
//
// DataCoord calls FlushSegments if the segment is seal&flush only.
// If DataNode receives a valid segment to flush, new flush message for the segment should be ignored.
// So if receiving calls to flush segment A, DataNode should guarantee the segment to be flushed.
func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmentsRequest) (*commonpb.Status, error) {
metrics.DataNodeFlushReqCounter.WithLabelValues(
fmt.Sprint(node.GetNodeID()),
metrics.TotalLabel).Inc()
serverID := node.GetNodeID()
metrics.DataNodeFlushReqCounter.WithLabelValues(fmt.Sprint(serverID), metrics.TotalLabel).Inc()

log := log.Ctx(ctx)
log := log.Ctx(ctx).With(
zap.Int64("nodeID", serverID),
zap.Int64("collectionID", req.GetCollectionID()),
zap.String("channelName", req.GetChannelName()),
zap.Int64s("segmentIDs", req.GetSegmentIDs()),
)
log.Info("receive FlushSegments request")

if err := merr.CheckHealthy(node.GetStateCode()); err != nil {
log.Warn("DataNode.FlushSegments failed", zap.Int64("nodeId", node.GetNodeID()), zap.Error(err))
log.Warn("failed to FlushSegments", zap.Error(err))
return merr.Status(err), nil
}

serverID := node.GetSession().ServerID
if req.GetBase().GetTargetID() != serverID {
log.Warn("flush segment target id not matched",
zap.Int64("targetID", req.GetBase().GetTargetID()),
zap.Int64("serverID", serverID),
)

log.Warn("faled to FlushSegments, target node not match", zap.Int64("targetID", req.GetBase().GetTargetID()))
return merr.Status(merr.WrapErrNodeNotMatch(req.GetBase().GetTargetID(), serverID)), nil
}

segmentIDs := req.GetSegmentIDs()
log = log.With(
zap.Int64("collectionID", req.GetCollectionID()),
zap.String("channelName", req.GetChannelName()),
zap.Int64s("segmentIDs", segmentIDs),
)

log.Info("receiving FlushSegments request")

err := node.writeBufferManager.SealSegments(ctx, req.GetChannelName(), segmentIDs)
err := node.writeBufferManager.SealSegments(ctx, req.GetChannelName(), req.GetSegmentIDs())
if err != nil {
log.Warn("failed to flush segments", zap.Error(err))
log.Warn("failed to FlushSegments", zap.Error(err))
return merr.Status(err), nil
}

// Log success flushed segments.
log.Info("sending segments to WriteBuffer Manager")
log.Info("success to FlushSegments")

metrics.DataNodeFlushReqCounter.WithLabelValues(
fmt.Sprint(node.GetNodeID()),
metrics.SuccessLabel).Inc()
metrics.DataNodeFlushReqCounter.WithLabelValues(fmt.Sprint(serverID), metrics.SuccessLabel).Inc()
return merr.Success(), nil
}

Expand Down
7 changes: 3 additions & 4 deletions pkg/mq/msgdispatcher/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,9 @@ type client struct {

func NewClient(factory msgstream.Factory, role string, nodeID int64) Client {
return &client{
role: role,
nodeID: nodeID,
factory: factory,
// managers: typeutil.NewConcurrentMap[string, DispatcherManager](),
role: role,
nodeID: nodeID,
factory: factory,
managers: make(map[string]DispatcherManager),
}
}
Expand Down
Loading