Skip to content

Commit

Permalink
allow server id to be updated
Browse files Browse the repository at this point in the history
Signed-off-by: yiwangdr <[email protected]>
  • Loading branch information
yiwangdr committed Mar 25, 2024
1 parent 31cf849 commit 6fb6a82
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 6 deletions.
11 changes: 8 additions & 3 deletions internal/datanode/data_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

"github.com/cockroachdb/errors"
clientv3 "go.etcd.io/etcd/client/v3"
uatomic "go.uber.org/atomic"
"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
Expand Down Expand Up @@ -83,7 +84,7 @@ var Params *paramtable.ComponentParam = paramtable.Get()
// `segmentCache` stores all flushing and flushed segments.
type DataNode struct {
ctx context.Context
serverID int64
serverID *uatomic.Int64
cancel context.CancelFunc
Role string
stateCode atomic.Value // commonpb.StateCode_Initializing
Expand Down Expand Up @@ -128,7 +129,7 @@ type DataNode struct {
}

// NewDataNode will return a DataNode with abnormal state.
func NewDataNode(ctx context.Context, factory dependency.Factory, serverID int64) *DataNode {
func NewDataNode(ctx context.Context, factory dependency.Factory, serverID *uatomic.Int64) *DataNode {
rand.Seed(time.Now().UnixNano())
ctx2, cancel2 := context.WithCancel(ctx)
node := &DataNode{
Expand Down Expand Up @@ -227,7 +228,11 @@ func (node *DataNode) initRateCollector() error {
}

func (node *DataNode) GetNodeID() int64 {
return node.serverID
if node.serverID != nil {
return node.serverID.Load()
}
log.Error("Datanode has nil server ID.", zap.Int64("Node Id from paramtable", paramtable.GetNodeID()))
return paramtable.GetNodeID()
}

func (node *DataNode) Init() error {
Expand Down
2 changes: 1 addition & 1 deletion internal/datanode/event_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (node *DataNode) StartWatchChannels(ctx context.Context) {
// serves the corner case for etcd connection lost and missing some events
func (node *DataNode) checkWatchedList() error {
// REF MEP#7 watch path should be [prefix]/channel/{node_id}/{channel_name}
prefix := path.Join(Params.CommonCfg.DataCoordWatchSubPath.GetValue(), fmt.Sprintf("%d", node.serverID))
prefix := path.Join(Params.CommonCfg.DataCoordWatchSubPath.GetValue(), fmt.Sprintf("%d", node.GetNodeID()))
keys, values, err := node.watchKv.LoadWithPrefix(prefix)
if err != nil {
return err
Expand Down
5 changes: 4 additions & 1 deletion internal/datanode/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"github.com/stretchr/testify/mock"
"go.uber.org/atomic"
"go.uber.org/zap"
"google.golang.org/grpc"

Expand Down Expand Up @@ -80,10 +81,12 @@ var segID2SegInfo = map[int64]*datapb.SegmentInfo{
InsertChannel: "by-dev-rootcoord-dml-test_v1",
},
}
var nodeID atomic.Int64

func newIDLEDataNodeMock(ctx context.Context, pkType schemapb.DataType) *DataNode {
factory := dependency.NewDefaultFactory(true)
node := NewDataNode(ctx, factory, 1)
nodeID.Store(1)
node := NewDataNode(ctx, factory, &nodeID)
node.SetSession(&sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}})
node.dispClient = msgdispatcher.NewClient(factory, typeutil.DataNodeRole, paramtable.GetNodeID())

Expand Down
2 changes: 1 addition & 1 deletion internal/distributed/datanode/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func NewServer(ctx context.Context, factory dependency.Factory) (*Server, error)
}

s.serverID.Store(paramtable.GetNodeID())
s.datanode = dn.NewDataNode(s.ctx, s.factory, s.serverID.Load())
s.datanode = dn.NewDataNode(s.ctx, s.factory, &s.serverID)
return s, nil
}

Expand Down

0 comments on commit 6fb6a82

Please sign in to comment.