Skip to content

Commit

Permalink
allow datanode/datacoord's server id to be updated
Browse files Browse the repository at this point in the history
Signed-off-by: yiwangdr <[email protected]>
  • Loading branch information
yiwangdr committed Apr 29, 2024
1 parent 84a14fa commit 37a41cf
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 11 deletions.
7 changes: 7 additions & 0 deletions internal/datacoord/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,13 @@ func (s *Server) startDataCoord() {
sessionutil.SaveServerInfo(typeutil.DataCoordRole, s.session.GetServerID())
}

func (s *Server) GetServerID() int64 {
if s.session != nil {
return s.session.GetServerID()
}
return paramtable.GetNodeID()

Check warning on line 473 in internal/datacoord/server.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/server.go#L473

Added line #L473 was not covered by tests
}

func (s *Server) afterStart() {}

func (s *Server) initCluster() error {
Expand Down
10 changes: 4 additions & 6 deletions internal/datanode/data_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ var Params *paramtable.ComponentParam = paramtable.Get()
// `segmentCache` stores all flushing and flushed segments.
type DataNode struct {
ctx context.Context
serverID int64
cancel context.CancelFunc
Role string
stateCode atomic.Value // commonpb.StateCode_Initializing
Expand Down Expand Up @@ -129,7 +128,7 @@ type DataNode struct {
}

// NewDataNode will return a DataNode with abnormal state.
func NewDataNode(ctx context.Context, factory dependency.Factory, serverID int64) *DataNode {
func NewDataNode(ctx context.Context, factory dependency.Factory) *DataNode {
rand.Seed(time.Now().UnixNano())
ctx2, cancel2 := context.WithCancel(ctx)
node := &DataNode{
Expand All @@ -140,7 +139,6 @@ func NewDataNode(ctx context.Context, factory dependency.Factory, serverID int64
rootCoord: nil,
dataCoord: nil,
factory: factory,
serverID: serverID,
segmentCache: newCache(),
compactionExecutor: newCompactionExecutor(),

Expand Down Expand Up @@ -228,10 +226,10 @@ func (node *DataNode) initRateCollector() error {
}

func (node *DataNode) GetNodeID() int64 {
if node.serverID == 0 && node.session != nil {
if node.session != nil {
return node.session.ServerID
}
return node.serverID
return paramtable.GetNodeID()
}

func (node *DataNode) Init() error {
Expand All @@ -246,7 +244,7 @@ func (node *DataNode) Init() error {
return
}

serverID := node.session.ServerID
serverID := node.GetNodeID()
log := log.Ctx(node.ctx).With(zap.String("role", typeutil.DataNodeRole), zap.Int64("nodeID", serverID))

node.broker = broker.NewCoordBroker(node.rootCoord, node.dataCoord, serverID)
Expand Down
2 changes: 1 addition & 1 deletion internal/datanode/event_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (node *DataNode) StartWatchChannels(ctx context.Context) {
// serves the corner case for etcd connection lost and missing some events
func (node *DataNode) checkWatchedList() error {
// REF MEP#7 watch path should be [prefix]/channel/{node_id}/{channel_name}
prefix := path.Join(Params.CommonCfg.DataCoordWatchSubPath.GetValue(), fmt.Sprintf("%d", node.serverID))
prefix := path.Join(Params.CommonCfg.DataCoordWatchSubPath.GetValue(), fmt.Sprintf("%d", node.GetNodeID()))
keys, values, err := node.watchKv.LoadWithPrefix(prefix)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion internal/datanode/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ var segID2SegInfo = map[int64]*datapb.SegmentInfo{

func newIDLEDataNodeMock(ctx context.Context, pkType schemapb.DataType) *DataNode {
factory := dependency.NewDefaultFactory(true)
node := NewDataNode(ctx, factory, 1)
node := NewDataNode(ctx, factory)
node.SetSession(&sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}})
node.dispClient = msgdispatcher.NewClient(factory, typeutil.DataNodeRole, paramtable.GetNodeID())

Expand Down
4 changes: 2 additions & 2 deletions internal/distributed/datacoord/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (s *Server) startGrpcLoop(grpcPort int) {
interceptor.ClusterValidationUnaryServerInterceptor(),
interceptor.ServerIDValidationUnaryServerInterceptor(func() int64 {
if s.serverID.Load() == 0 {
s.serverID.Store(paramtable.GetNodeID())
s.serverID.Store(s.dataCoord.(*datacoord.Server).GetServerID())
}
return s.serverID.Load()
}),
Expand All @@ -191,7 +191,7 @@ func (s *Server) startGrpcLoop(grpcPort int) {
interceptor.ClusterValidationStreamServerInterceptor(),
interceptor.ServerIDValidationStreamServerInterceptor(func() int64 {
if s.serverID.Load() == 0 {
s.serverID.Store(paramtable.GetNodeID())
s.serverID.Store(s.dataCoord.(*datacoord.Server).GetServerID())

Check warning on line 194 in internal/distributed/datacoord/service.go

View check run for this annotation

Codecov / codecov/patch

internal/distributed/datacoord/service.go#L194

Added line #L194 was not covered by tests
}
return s.serverID.Load()
}),
Expand Down
2 changes: 1 addition & 1 deletion internal/distributed/datanode/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func NewServer(ctx context.Context, factory dependency.Factory) (*Server, error)
}

s.serverID.Store(paramtable.GetNodeID())
s.datanode = dn.NewDataNode(s.ctx, s.factory, s.serverID.Load())
s.datanode = dn.NewDataNode(s.ctx, s.factory)
return s, nil
}

Expand Down

0 comments on commit 37a41cf

Please sign in to comment.