diff --git a/.clang-format b/.clang-format index 5a5e015e24700..015086cb711e8 100644 --- a/.clang-format +++ b/.clang-format @@ -31,4 +31,4 @@ AlignTrailingComments: true SortIncludes: false Standard: Latest AlignAfterOpenBracket: Align -BinPackParameters: false \ No newline at end of file +BinPackParameters: false diff --git a/internal/datacoord/channel_checker.go b/internal/datacoord/channel_checker.go new file mode 100644 index 0000000000000..8915bf52828e5 --- /dev/null +++ b/internal/datacoord/channel_checker.go @@ -0,0 +1,178 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package datacoord + +import ( + "fmt" + "path" + "strconv" + "sync" + "time" + + "github.com/golang/protobuf/proto" + "github.com/milvus-io/milvus/internal/kv" + "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/proto/datapb" + + clientv3 "go.etcd.io/etcd/client/v3" + "go.uber.org/zap" +) + +type channelStateTimer struct { + watchkv kv.MetaKv + runningTimers sync.Map // channel name to timer stop channels + etcdWatcher clientv3.WatchChan + timeoutWatcher chan *ackEvent +} + +func newChannelStateTimer(kv kv.MetaKv) *channelStateTimer { + return &channelStateTimer{ + watchkv: kv, + timeoutWatcher: make(chan *ackEvent, 20), + } +} + +func (c *channelStateTimer) getWatchers(prefix string) (clientv3.WatchChan, chan *ackEvent) { + if c.etcdWatcher == nil { + c.etcdWatcher = c.watchkv.WatchWithPrefix(prefix) + + } + return c.etcdWatcher, c.timeoutWatcher +} + +func (c *channelStateTimer) loadAllChannels(nodeID UniqueID) ([]*datapb.ChannelWatchInfo, error) { + prefix := path.Join(Params.DataCoordCfg.ChannelWatchSubPath, strconv.FormatInt(nodeID, 10)) + + // TODO: change to LoadWithPrefixBytes + keys, values, err := c.watchkv.LoadWithPrefix(prefix) + if err != nil { + return nil, err + } + + ret := []*datapb.ChannelWatchInfo{} + + for i, k := range keys { + watchInfo, err := parseWatchInfo(k, []byte(values[i])) + if err != nil { + // TODO: delete this kv later + log.Warn("invalid watchInfo loaded", zap.Error(err)) + continue + } + + ret = append(ret, watchInfo) + } + + return ret, nil +} + +// startOne can write ToWatch or ToRelease states. +func (c *channelStateTimer) startOne(watchState datapb.ChannelWatchState, channelName string, nodeID UniqueID, timeoutTs int64) { + if timeoutTs == 0 { + log.Debug("zero timeoutTs, skip starting timer", + zap.String("watch state", watchState.String()), + zap.Int64("nodeID", nodeID), + zap.String("channel name", channelName), + ) + return + } + stop := make(chan struct{}) + c.runningTimers.Store(channelName, stop) + timeoutT := time.Unix(0, timeoutTs) + go func() { + log.Debug("timer started", + zap.String("watch state", watchState.String()), + zap.Int64("nodeID", nodeID), + zap.String("channel name", channelName), + zap.Time("timeout time", timeoutT)) + select { + case <-time.NewTimer(time.Until(timeoutT)).C: + log.Info("timeout and stop timer: wait for channel ACK timeout", + zap.String("state", watchState.String()), + zap.String("channel name", channelName), + zap.Time("timeout time", timeoutT)) + ackType := getAckType(watchState) + c.notifyTimeoutWatcher(&ackEvent{ackType, channelName, nodeID}) + case <-stop: + log.Debug("stop timer before timeout", + zap.String("state", watchState.String()), + zap.String("channel name", channelName), + zap.Time("timeout time", timeoutT)) + } + }() +} + +func (c *channelStateTimer) notifyTimeoutWatcher(e *ackEvent) { + c.timeoutWatcher <- e +} + +func (c *channelStateTimer) removeTimers(channels []string) { + for _, channel := range channels { + if stop, ok := c.runningTimers.LoadAndDelete(channel); ok { + close(stop.(chan struct{})) + } + } +} + +func (c *channelStateTimer) stopIfExsit(e *ackEvent) { + stop, ok := c.runningTimers.LoadAndDelete(e.channelName) + if ok && e.ackType != watchTimeoutAck && e.ackType != releaseTimeoutAck { + close(stop.(chan struct{})) + } +} + +func parseWatchInfo(key string, data []byte) (*datapb.ChannelWatchInfo, error) { + watchInfo := datapb.ChannelWatchInfo{} + if err := proto.Unmarshal(data, &watchInfo); err != nil { + return nil, fmt.Errorf("invalid event data: fail to parse ChannelWatchInfo, key: %s, err: %v", key, err) + + } + + if watchInfo.Vchan == nil { + return nil, fmt.Errorf("invalid event: ChannelWatchInfo with nil VChannelInfo, key: %s", key) + } + + return &watchInfo, nil +} + +// parseAckEvent transfers key-values from etcd into ackEvent +func parseAckEvent(nodeID UniqueID, info *datapb.ChannelWatchInfo) *ackEvent { + ret := &ackEvent{ + ackType: getAckType(info.GetState()), + channelName: info.GetVchan().GetChannelName(), + nodeID: nodeID, + } + return ret +} + +func getAckType(state datapb.ChannelWatchState) ackType { + switch state { + case datapb.ChannelWatchState_WatchSuccess, datapb.ChannelWatchState_Complete: + return watchSuccessAck + case datapb.ChannelWatchState_WatchFailure: + return watchFailAck + case datapb.ChannelWatchState_ReleaseSuccess: + return releaseSuccessAck + case datapb.ChannelWatchState_ReleaseFailure: + return releaseFailAck + case datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_Uncomplete: // unchange watch states generates timeout acks + return watchTimeoutAck + case datapb.ChannelWatchState_ToRelease: // unchange watch states generates timeout acks + return releaseTimeoutAck + default: + return invalidAck + } +} diff --git a/internal/datacoord/channel_checker_test.go b/internal/datacoord/channel_checker_test.go new file mode 100644 index 0000000000000..2aeabf26130e8 --- /dev/null +++ b/internal/datacoord/channel_checker_test.go @@ -0,0 +1,198 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package datacoord + +import ( + "path" + "testing" + "time" + + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/golang/protobuf/proto" +) + +func TestChannelStateTimer(t *testing.T) { + kv := getMetaKv(t) + defer kv.Close() + + prefix := Params.DataCoordCfg.ChannelWatchSubPath + + t.Run("test getWatcher", func(t *testing.T) { + timer := newChannelStateTimer(kv) + + etcdCh, timeoutCh := timer.getWatchers(prefix) + assert.NotNil(t, etcdCh) + assert.NotNil(t, timeoutCh) + + timer.getWatchers(prefix) + assert.NotNil(t, etcdCh) + assert.NotNil(t, timeoutCh) + }) + + t.Run("test loadAllChannels", func(t *testing.T) { + defer kv.RemoveWithPrefix("") + timer := newChannelStateTimer(kv) + timer.loadAllChannels(1) + + validWatchInfo := datapb.ChannelWatchInfo{ + Vchan: &datapb.VchannelInfo{}, + StartTs: time.Now().Unix(), + State: datapb.ChannelWatchState_ToWatch, + TimeoutTs: time.Now().Add(20 * time.Millisecond).UnixNano(), + } + validData, err := proto.Marshal(&validWatchInfo) + require.NoError(t, err) + + prefix = Params.DataCoordCfg.ChannelWatchSubPath + prepareKvs := map[string]string{ + path.Join(prefix, "1/channel-1"): "invalidWatchInfo", + path.Join(prefix, "1/channel-2"): string(validData), + path.Join(prefix, "2/channel-3"): string(validData), + } + + err = kv.MultiSave(prepareKvs) + require.NoError(t, err) + + tests := []struct { + inNodeID UniqueID + outLen int + }{ + {1, 1}, + {2, 1}, + {3, 0}, + } + + for _, test := range tests { + infos, err := timer.loadAllChannels(test.inNodeID) + assert.NoError(t, err) + assert.Equal(t, test.outLen, len(infos)) + } + }) + + t.Run("test startOne", func(t *testing.T) { + normalTimeoutTs := time.Now().Add(20 * time.Second).UnixNano() + nowTimeoutTs := time.Now().UnixNano() + zeroTimeoutTs := int64(0) + tests := []struct { + channelName string + timeoutTs int64 + + description string + }{ + {"channel-1", normalTimeoutTs, "test stop"}, + {"channel-2", nowTimeoutTs, "test timeout"}, + {"channel-3", zeroTimeoutTs, "not start"}, + } + + timer := newChannelStateTimer(kv) + + _, timeoutCh := timer.getWatchers(prefix) + + for _, test := range tests { + t.Run(test.description, func(t *testing.T) { + timer.startOne(datapb.ChannelWatchState_ToWatch, test.channelName, 1, test.timeoutTs) + if test.timeoutTs == nowTimeoutTs { + e := <-timeoutCh + assert.Equal(t, watchTimeoutAck, e.ackType) + assert.Equal(t, test.channelName, e.channelName) + } else { + timer.stopIfExsit(&ackEvent{watchSuccessAck, test.channelName, 1}) + } + }) + } + + timer.startOne(datapb.ChannelWatchState_ToWatch, "channel-remove", 1, normalTimeoutTs) + timer.removeTimers([]string{"channel-remove"}) + }) +} + +func TestChannelStateTimer_parses(t *testing.T) { + const ( + ValidTest = true + InValidTest = false + ) + + t.Run("test parseWatchInfo", func(t *testing.T) { + validWatchInfo := datapb.ChannelWatchInfo{ + Vchan: &datapb.VchannelInfo{}, + StartTs: time.Now().Unix(), + State: datapb.ChannelWatchState_ToWatch, + TimeoutTs: time.Now().Add(20 * time.Millisecond).UnixNano(), + } + validData, err := proto.Marshal(&validWatchInfo) + require.NoError(t, err) + + invalidDataUnableToMarshal := []byte("invalidData") + + invalidWatchInfoNilVchan := validWatchInfo + invalidWatchInfoNilVchan.Vchan = nil + invalidDataNilVchan, err := proto.Marshal(&invalidWatchInfoNilVchan) + require.NoError(t, err) + + tests := []struct { + inKey string + inData []byte + + isValid bool + description string + }{ + {"key", validData, ValidTest, "test with valid watchInfo"}, + {"key", invalidDataUnableToMarshal, InValidTest, "test with watchInfo unable to marshal"}, + {"key", invalidDataNilVchan, InValidTest, "test with watchInfo with nil Vchan"}, + } + + for _, test := range tests { + t.Run(test.description, func(t *testing.T) { + info, err := parseWatchInfo(test.inKey, test.inData) + if test.isValid { + assert.NoError(t, err) + assert.NotNil(t, info) + assert.Equal(t, info.GetState(), validWatchInfo.GetState()) + assert.Equal(t, info.GetStartTs(), validWatchInfo.GetStartTs()) + assert.Equal(t, info.GetTimeoutTs(), validWatchInfo.GetTimeoutTs()) + } else { + assert.Nil(t, info) + assert.Error(t, err) + } + }) + } + }) + + t.Run("test getAckType", func(t *testing.T) { + tests := []struct { + inState datapb.ChannelWatchState + outAckType ackType + }{ + {datapb.ChannelWatchState_WatchSuccess, watchSuccessAck}, + {datapb.ChannelWatchState_WatchFailure, watchFailAck}, + {datapb.ChannelWatchState_ToWatch, watchTimeoutAck}, + {datapb.ChannelWatchState_Uncomplete, watchTimeoutAck}, + {datapb.ChannelWatchState_ReleaseSuccess, releaseSuccessAck}, + {datapb.ChannelWatchState_ReleaseFailure, releaseFailAck}, + {datapb.ChannelWatchState_ToRelease, releaseTimeoutAck}, + {100, invalidAck}, + } + + for _, test := range tests { + assert.Equal(t, test.outAckType, getAckType(test.inState)) + } + + }) +} diff --git a/internal/datacoord/channel_manager.go b/internal/datacoord/channel_manager.go index f099e418bbf87..04712c5995568 100644 --- a/internal/datacoord/channel_manager.go +++ b/internal/datacoord/channel_manager.go @@ -26,6 +26,10 @@ import ( "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/mq/msgstream" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/util/logutil" + + v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" + clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" "stathat.com/c/consistent" ) @@ -47,6 +51,10 @@ type ChannelManager struct { reassignPolicy ChannelReassignPolicy bgChecker ChannelBGChecker msgstreamFactory msgstream.Factory + + stateChecker channelStateChecker + stopChecker context.CancelFunc + stateTimer *channelStateTimer } type channel struct { @@ -69,16 +77,21 @@ func withMsgstreamFactory(f msgstream.Factory) ChannelManagerOpt { return func(c *ChannelManager) { c.msgstreamFactory = f } } +func withStateChecker() ChannelManagerOpt { + return func(c *ChannelManager) { c.stateChecker = c.watchChannelStatesLoop } +} + // NewChannelManager creates and returns a new ChannelManager instance. func NewChannelManager( - kv kv.TxnKV, + kv kv.MetaKv, // for TxnKv and MetaKv h Handler, options ...ChannelManagerOpt, ) (*ChannelManager, error) { c := &ChannelManager{ - h: h, - factory: NewChannelPolicyFactoryV1(kv), - store: NewChannelStore(kv), + h: h, + factory: NewChannelPolicyFactoryV1(kv), + store: NewChannelStore(kv), + stateTimer: newChannelStateTimer(kv), } if err := c.store.Reload(); err != nil { @@ -98,7 +111,7 @@ func NewChannelManager( } // Startup adjusts the channel store according to current cluster states. -func (c *ChannelManager) Startup(nodes []int64) error { +func (c *ChannelManager) Startup(ctx context.Context, nodes []int64) error { channels := c.store.GetNodesChannels() // Retrieve the current old nodes. oNodes := make([]int64, 0, len(channels)) @@ -106,6 +119,11 @@ func (c *ChannelManager) Startup(nodes []int64) error { oNodes = append(oNodes, c.NodeID) } + // Process watch states for old nodes. + if err := c.checkOldNodes(oNodes); err != nil { + return err + } + // Add new online nodes to the cluster. newOnLines := c.getNewOnLines(nodes, oNodes) for _, n := range newOnLines { @@ -125,6 +143,13 @@ func (c *ChannelManager) Startup(nodes []int64) error { // Unwatch and drop channel with drop flag. c.unwatchDroppedChannels() + if c.stateChecker != nil { + ctx1, cancel := context.WithCancel(ctx) + c.stopChecker = cancel + go c.stateChecker(ctx1) + log.Debug("starting etcd states checker") + } + log.Info("cluster start up", zap.Any("nodes", nodes), zap.Any("oNodes", oNodes), @@ -133,6 +158,50 @@ func (c *ChannelManager) Startup(nodes []int64) error { return nil } +// checkOldNodes processes the existing watch channels when starting up. +// ToWatch get startTs and timeoutTs, start timer +// WatchSuccess ignore +// WatchFail ToRelease +// ToRelase get startTs and timeoutTs, start timer +// ReleaseSuccess remove +// ReleaseFail clean up and remove +func (c *ChannelManager) checkOldNodes(nodes []UniqueID) error { + for _, nodeID := range nodes { + watchInfos, err := c.stateTimer.loadAllChannels(nodeID) + if err != nil { + return err + } + + for _, info := range watchInfos { + channelName := info.GetVchan().GetChannelName() + + switch info.GetState() { + case datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_Uncomplete: + c.stateTimer.startOne(datapb.ChannelWatchState_ToWatch, channelName, nodeID, info.GetTimeoutTs()) + + case datapb.ChannelWatchState_WatchFailure: + if err := c.Release(nodeID, channelName); err != nil { + return err + } + + case datapb.ChannelWatchState_ToRelease: + c.stateTimer.startOne(datapb.ChannelWatchState_ToRelease, channelName, nodeID, info.GetTimeoutTs()) + + case datapb.ChannelWatchState_ReleaseSuccess: + if err := c.toDelete(nodeID, channelName); err != nil { + return err + } + + case datapb.ChannelWatchState_ReleaseFailure: + if err := c.cleanUpAndDelete(nodeID, channelName); err != nil { + return err + } + } + } + } + return nil +} + // unwatchDroppedChannels removes drops channel that are marked to drop. func (c *ChannelManager) unwatchDroppedChannels() { nodeChannels := c.store.GetNodesChannels() @@ -224,20 +293,21 @@ func (c *ChannelManager) AddNode(nodeID int64) error { c.store.Add(nodeID) + // the default registerPolicy doesn't reassgin channels already there updates := c.registerPolicy(c.store, nodeID) + if len(updates) <= 0 { + return nil + } + log.Info("register node", zap.Int64("registered node", nodeID), zap.Array("updates", updates)) - for _, op := range updates { - if op.Type == Add { - c.fillChannelWatchInfo(op) - } - } - return c.store.Update(updates) + return c.updateWithTimer(updates, datapb.ChannelWatchState_ToWatch) } // DeleteNode deletes the node from the cluster. +// DeleteNode deletes the nodeID's watchInfos in Etcd and reassign the channels to other Nodes func (c *ChannelManager) DeleteNode(nodeID int64) error { c.mu.Lock() defer c.mu.Unlock() @@ -254,12 +324,7 @@ func (c *ChannelManager) DeleteNode(nodeID int64) error { zap.Int64("unregistered node", nodeID), zap.Array("updates", updates)) - for _, v := range updates { - if v.Type == Add { - c.fillChannelWatchInfo(v) - } - } - if err := c.store.Update(updates); err != nil { + if err := c.updateWithTimer(updates, datapb.ChannelWatchState_ToWatch); err != nil { return err } _, err := c.store.Delete(nodeID) @@ -312,24 +377,16 @@ func (c *ChannelManager) Watch(ch *channel) error { if len(updates) == 0 { return nil } - log.Info("watch channel", + log.Info("try to update channel watch info with ToWatch state", zap.Any("channel", ch), zap.Array("updates", updates)) - for _, v := range updates { - if v.Type == Add { - c.fillChannelWatchInfo(v) - } - } - err := c.store.Update(updates) + err := c.updateWithTimer(updates, datapb.ChannelWatchState_ToWatch) if err != nil { - log.Error("ChannelManager RWChannelStore update failed", zap.Int64("collectionID", ch.CollectionID), - zap.String("channelName", ch.Name), zap.Error(err)) - return err + log.Warn("fail to update channel watch info with ToWatch state", + zap.Any("channel", ch), zap.Array("updates", updates), zap.Error(err)) } - log.Info("ChannelManager RWChannelStore update success", zap.Int64("collectionID", ch.CollectionID), - zap.String("channelName", ch.Name)) - return nil + return err } // fillChannelWatchInfo updates the channel op by filling in channel watch info. @@ -346,6 +403,31 @@ func (c *ChannelManager) fillChannelWatchInfo(op *ChannelOp) { } } +// fillChannelWatchInfoWithState updates the channel op by filling in channel watch info. +func (c *ChannelManager) fillChannelWatchInfoWithState(op *ChannelOp, state datapb.ChannelWatchState) []string { + var channelsWithTimer = []string{} + startTs := time.Now().Unix() + timeoutTs := time.Now().Add(maxWatchDuration).UnixNano() + for _, ch := range op.Channels { + vcInfo := c.h.GetVChanPositions(ch.Name, ch.CollectionID, allPartitionID) + info := &datapb.ChannelWatchInfo{ + Vchan: vcInfo, + StartTs: startTs, + State: state, + TimeoutTs: timeoutTs, + } + + // Only set timer for watchInfo not from bufferID + if op.NodeID != bufferID { + c.stateTimer.startOne(state, ch.Name, op.NodeID, timeoutTs) + channelsWithTimer = append(channelsWithTimer, ch.Name) + } + + op.ChannelWatchInfos = append(op.ChannelWatchInfos, info) + } + return channelsWithTimer +} + // GetChannels gets channels info of registered nodes. func (c *ChannelManager) GetChannels() []*NodeChannelInfo { c.mu.RLock() @@ -438,3 +520,270 @@ func (c *ChannelManager) findChannel(channelName string) (int64, *channel) { } return 0, nil } + +type ackType = int + +const ( + invalidAck = iota + watchSuccessAck + watchFailAck + watchTimeoutAck + releaseSuccessAck + releaseFailAck + releaseTimeoutAck +) + +type ackEvent struct { + ackType ackType + channelName string + nodeID UniqueID +} + +func (c *ChannelManager) updateWithTimer(updates ChannelOpSet, state datapb.ChannelWatchState) error { + var channelsWithTimer = []string{} + for _, op := range updates { + if op.Type == Add { + channelsWithTimer = append(channelsWithTimer, c.fillChannelWatchInfoWithState(op, state)...) + } + } + + err := c.store.Update(updates) + if err != nil { + log.Warn("fail to update", zap.Array("updates", updates)) + c.stateTimer.removeTimers(channelsWithTimer) + } + return err +} + +func (c *ChannelManager) processAck(e *ackEvent) { + c.stateTimer.stopIfExsit(e) + + switch e.ackType { + case invalidAck: + log.Warn("detected invalid Ack", zap.String("channel name", e.channelName)) + + case watchSuccessAck: + log.Info("datanode successfully watched channel", zap.Int64("nodeID", e.nodeID), zap.String("channel name", e.channelName)) + case watchFailAck, watchTimeoutAck: // failure acks from toWatch + err := c.Release(e.nodeID, e.channelName) + if err != nil { + log.Warn("fail to set channels to release for watch failure ACKs", + zap.Int64("nodeID", e.nodeID), zap.String("channel name", e.channelName)) + } + + case releaseFailAck, releaseTimeoutAck: // failure acks from toRelease + err := c.cleanUpAndDelete(e.nodeID, e.channelName) + if err != nil { + log.Warn("fail to clean and delete channels for release failure ACKs", + zap.Int64("nodeID", e.nodeID), zap.String("channel name", e.channelName)) + } + + case releaseSuccessAck: + err := c.toDelete(e.nodeID, e.channelName) + if err != nil { + log.Warn("fail to response to release success ACK", + zap.Int64("nodeID", e.nodeID), zap.String("channel name", e.channelName)) + } + } +} + +// cleanUpAndDelete tries to clean up datanode's subscription, and then delete channel watch info. +func (c *ChannelManager) cleanUpAndDelete(nodeID UniqueID, channelName string) error { + c.mu.Lock() + defer c.mu.Unlock() + + chToCleanUp := c.getChannelByNodeAndName(nodeID, channelName) + if chToCleanUp == nil { + return fmt.Errorf("failed to find matching channel: %s and node: %d", channelName, nodeID) + } + + if c.msgstreamFactory == nil { + log.Warn("msgstream factory is not set, unable to clean up topics") + } else { + subName := buildSubName(chToCleanUp.CollectionID, nodeID) + err := c.unsubscribe(subName, channelName) + if err != nil { + log.Warn("failed to unsubscribe topic", zap.String("subcription", subName), zap.String("channel name", channelName), zap.Error(err)) + } + } + + if !c.isMarkedDrop(channelName) { + reallocates := &NodeChannelInfo{nodeID, []*channel{chToCleanUp}} + + // reassign policy won't choose the same Node for a ressignment of a channel + updates := c.reassignPolicy(c.store, []*NodeChannelInfo{reallocates}) + if len(updates) <= 0 { + log.Warn("fail to reassign channel to other nodes, add channel to buffer", zap.String("channel name", channelName)) + updates.Add(bufferID, []*channel{chToCleanUp}) + } + + err := c.remove(nodeID, chToCleanUp) + if err != nil { + return fmt.Errorf("failed to remove watch info: %v,%s", chToCleanUp, err.Error()) + } + + log.Info("channel manager reassign channels", zap.Int64("old node ID", nodeID), zap.Array("updates", updates)) + + return c.updateWithTimer(updates, datapb.ChannelWatchState_ToWatch) + } + + err := c.remove(nodeID, chToCleanUp) + if err != nil { + return fmt.Errorf("failed to remove watch info: %v,%s", chToCleanUp, err.Error()) + } + + log.Debug("try to cleanup removal flag ", zap.String("channel name", channelName)) + c.h.FinishDropChannel(channelName) + + return nil +} + +type channelStateChecker func(context.Context) + +func (c *ChannelManager) watchChannelStatesLoop(ctx context.Context) { + defer logutil.LogPanic() + + // REF MEP#7 watchInfo paths are orgnized as: [prefix]/channel/{node_id}/{channel_name} + watchPrefix := Params.DataCoordCfg.ChannelWatchSubPath + etcdWatcher, timeoutWatcher := c.stateTimer.getWatchers(watchPrefix) + + for { + select { + case <-ctx.Done(): + log.Info("watch etcd loop quit") + return + case ackEvent := <-timeoutWatcher: + log.Debug("receive timeout acks from state watcher", + zap.Int64("nodeID", ackEvent.nodeID), zap.String("channel name", ackEvent.channelName)) + c.processAck(ackEvent) + case event := <-etcdWatcher: + if event.Canceled { + log.Warn("watch channel canceled", zap.Error(event.Err())) + // https://github.com/etcd-io/etcd/issues/8980 + if event.Err() == v3rpc.ErrCompacted { + go c.watchChannelStatesLoop(ctx) + return + } + // if watch loop return due to event canceled, the datacoord is not functional anymore + log.Panic("datacoord is not functional for event canceled") + } + + for _, evt := range event.Events { + if evt.Type == clientv3.EventTypeDelete { + continue + } + key := string(evt.Kv.Key) + watchInfo, err := parseWatchInfo(key, evt.Kv.Value) + if err != nil { + log.Warn("fail to parse watch info", zap.Error(err)) + continue + } + + // ignore these states + state := watchInfo.GetState() + if state == datapb.ChannelWatchState_ToWatch || + state == datapb.ChannelWatchState_ToRelease || + state == datapb.ChannelWatchState_Uncomplete { + continue + } + + nodeID, err := parseNodeKey(key) + if err != nil { + log.Warn("fail to parse node from key", zap.String("key", key), zap.Error(err)) + continue + } + + ackEvent := parseAckEvent(nodeID, watchInfo) + c.processAck(ackEvent) + } + } + } +} + +// Release writes ToRlease channel watch states for a channel +func (c *ChannelManager) Release(nodeID UniqueID, channelName string) error { + c.mu.Lock() + defer c.mu.Unlock() + + toReleaseChannel := c.getChannelByNodeAndName(nodeID, channelName) + if toReleaseChannel == nil { + return fmt.Errorf("fail to find matching nodID: %d with channelName: %s", nodeID, channelName) + } + + toReleaseUpdates := getReleaseOp(nodeID, toReleaseChannel) + err := c.updateWithTimer(toReleaseUpdates, datapb.ChannelWatchState_ToRelease) + if err != nil { + log.Debug("fail to update to release with timer", zap.Array("to release updates", toReleaseUpdates)) + } + + return err +} + +// toDelete removes channel assignment from a datanode +func (c *ChannelManager) toDelete(nodeID UniqueID, channelName string) error { + c.mu.Lock() + defer c.mu.Unlock() + + ch := c.getChannelByNodeAndName(nodeID, channelName) + if ch == nil { + return fmt.Errorf("fail to find matching nodID: %d with channelName: %s", nodeID, channelName) + } + + if !c.isMarkedDrop(channelName) { + reallocates := &NodeChannelInfo{nodeID, []*channel{ch}} + + // reassign policy won't choose the same Node for a ressignment of a channel + updates := c.reassignPolicy(c.store, []*NodeChannelInfo{reallocates}) + if len(updates) <= 0 { + log.Warn("fail to reassign channel to other nodes, add to the buffer", zap.String("channel name", channelName)) + updates.Add(bufferID, []*channel{ch}) + } + + err := c.remove(nodeID, ch) + if err != nil { + return fmt.Errorf("failed to remove watch info: %v,%s", ch, err.Error()) + } + + log.Info("channel manager reassign channels", zap.Int64("old node ID", nodeID), zap.Array("updates", updates)) + + return c.updateWithTimer(updates, datapb.ChannelWatchState_ToWatch) + } + + err := c.remove(nodeID, ch) + if err != nil { + return err + } + + log.Debug("try to cleanup removal flag ", zap.String("channel name", channelName)) + c.h.FinishDropChannel(channelName) + + log.Info("removed channel assignment", zap.Any("channel", ch)) + return nil +} + +func (c *ChannelManager) getChannelByNodeAndName(nodeID UniqueID, channelName string) *channel { + var ret *channel + + nodeChannelInfo := c.store.GetNode(nodeID) + if nodeChannelInfo == nil { + return nil + } + + for _, channel := range nodeChannelInfo.Channels { + if channel.Name == channelName { + ret = channel + break + } + } + return ret +} + +func (c *ChannelManager) isMarkedDrop(channelName string) bool { + return c.h.CheckShouldDropChannel(channelName) +} + +func getReleaseOp(nodeID UniqueID, ch *channel) ChannelOpSet { + var op ChannelOpSet + op.Add(nodeID, []*channel{ch}) + return op +} diff --git a/internal/datacoord/channel_manager_test.go b/internal/datacoord/channel_manager_test.go index cd6942a9f9902..1a212faec084d 100644 --- a/internal/datacoord/channel_manager_test.go +++ b/internal/datacoord/channel_manager_test.go @@ -17,19 +17,709 @@ package datacoord import ( + "context" + "os" + "path" + "strconv" + "sync" "testing" + "time" - memkv "github.com/milvus-io/milvus/internal/kv/mem" + "github.com/golang/protobuf/proto" + "github.com/milvus-io/milvus/internal/kv" + "github.com/milvus-io/milvus/internal/mq/msgstream" + "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "stathat.com/c/consistent" ) -func TestReload(t *testing.T) { +func checkWatchInfoWithState(t *testing.T, kv kv.MetaKv, state datapb.ChannelWatchState, nodeID UniqueID, channelName string, collectionID UniqueID) { + prefix := Params.DataCoordCfg.ChannelWatchSubPath + + info, err := kv.Load(path.Join(prefix, strconv.FormatInt(nodeID, 10), channelName)) + assert.NoError(t, err) + assert.NotNil(t, info) + + watchInfo, err := parseWatchInfo("fakeKey", []byte(info)) + assert.NoError(t, err) + assert.Equal(t, watchInfo.GetState(), state) + assert.Equal(t, watchInfo.Vchan.GetChannelName(), channelName) + assert.Equal(t, watchInfo.Vchan.GetCollectionID(), collectionID) +} + +func getOpsWithWatchInfo(nodeID UniqueID, ch *channel) ChannelOpSet { + var ops ChannelOpSet + ops.Add(nodeID, []*channel{ch}) + + for _, op := range ops { + op.ChannelWatchInfos = []*datapb.ChannelWatchInfo{{}} + } + return ops +} + +func TestChannelManager_StateTransfer(t *testing.T) { + metakv := getMetaKv(t) + defer func() { + metakv.RemoveWithPrefix("") + metakv.Close() + }() + + p := "/tmp/milvus_ut/rdb_data" + os.Setenv("ROCKSMQ_PATH", p) + + prefix := Params.DataCoordCfg.ChannelWatchSubPath + + var ( + collectionID = UniqueID(9) + nodeID = UniqueID(119) + channel1 = "channel1" + ) + + getWatchInfoWithState := func(state datapb.ChannelWatchState) *datapb.ChannelWatchInfo { + return &datapb.ChannelWatchInfo{ + Vchan: &datapb.VchannelInfo{ + CollectionID: collectionID, + ChannelName: channel1, + }, + State: state, + } + } + + t.Run("toWatch-WatchSuccess", func(t *testing.T) { + metakv.RemoveWithPrefix("") + ctx, cancel := context.WithCancel(context.TODO()) + chManager, err := NewChannelManager(metakv, newMockHandler()) + require.NoError(t, err) + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + chManager.watchChannelStatesLoop(ctx) + wg.Done() + }() + + chManager.AddNode(nodeID) + chManager.Watch(&channel{channel1, collectionID}) + data, err := proto.Marshal(getWatchInfoWithState(datapb.ChannelWatchState_WatchSuccess)) + require.NoError(t, err) + err = metakv.Save(path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1), string(data)) + require.NoError(t, err) + + // TODO: cancel could arrive earlier than etcd action watch channel + // if etcd has poor response latency. + time.Sleep(time.Second) + cancel() + wg.Wait() + + checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_WatchSuccess, nodeID, channel1, collectionID) + }) + + t.Run("ToWatch-WatchFail-ToRelease", func(t *testing.T) { + metakv.RemoveWithPrefix("") + ctx, cancel := context.WithCancel(context.TODO()) + chManager, err := NewChannelManager(metakv, newMockHandler()) + require.NoError(t, err) + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + chManager.watchChannelStatesLoop(ctx) + wg.Done() + }() + + chManager.AddNode(nodeID) + chManager.Watch(&channel{channel1, collectionID}) + data, err := proto.Marshal(getWatchInfoWithState(datapb.ChannelWatchState_WatchFailure)) + require.NoError(t, err) + err = metakv.Save(path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1), string(data)) + require.NoError(t, err) + + // TODO: cancel could arrive earlier than etcd action watch channel + // if etcd has poor response latency. + time.Sleep(time.Second) + cancel() + wg.Wait() + checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToRelease, nodeID, channel1, collectionID) + }) + + t.Run("ToWatch-Timeout", func(t *testing.T) { + metakv.RemoveWithPrefix("") + ctx, cancel := context.WithCancel(context.TODO()) + chManager, err := NewChannelManager(metakv, newMockHandler()) + require.NoError(t, err) + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + chManager.watchChannelStatesLoop(ctx) + wg.Done() + }() + + chManager.AddNode(nodeID) + chManager.Watch(&channel{channel1, collectionID}) + + // simulating timeout behavior of startOne, cuz 20s is a long wait + e := &ackEvent{ + ackType: watchTimeoutAck, + channelName: channel1, + nodeID: nodeID, + } + chManager.stateTimer.notifyTimeoutWatcher(e) + chManager.stateTimer.stopIfExsit(e) + + // TODO: cancel could arrive earlier than etcd action watch channel + // if etcd has poor response latency. + time.Sleep(time.Second) + cancel() + wg.Wait() + checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToRelease, nodeID, channel1, collectionID) + }) + + t.Run("toRelease-ReleaseSuccess-Delete-reassign-ToWatch", func(t *testing.T) { + var oldNode = UniqueID(120) + + metakv.RemoveWithPrefix("") + ctx, cancel := context.WithCancel(context.TODO()) + chManager, err := NewChannelManager(metakv, newMockHandler()) + require.NoError(t, err) + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + chManager.watchChannelStatesLoop(ctx) + wg.Done() + }() + + chManager.store = &ChannelStore{ + store: metakv, + channelsInfo: map[int64]*NodeChannelInfo{ + nodeID: {nodeID, []*channel{ + {channel1, collectionID}, + }}, + }, + } + + err = chManager.Release(nodeID, channel1) + assert.NoError(t, err) + chManager.AddNode(oldNode) + + data, err := proto.Marshal(getWatchInfoWithState(datapb.ChannelWatchState_ReleaseSuccess)) + require.NoError(t, err) + err = metakv.Save(path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1), string(data)) + require.NoError(t, err) + + // TODO: cancel could arrive earlier than etcd action watch channel + // if etcd has poor response latency. + time.Sleep(time.Second) + cancel() + wg.Wait() + + w, err := metakv.Load(path.Join(prefix, strconv.FormatInt(nodeID, 10))) + assert.Error(t, err) + assert.Empty(t, w) + + checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, oldNode, channel1, collectionID) + }) + + t.Run("toRelease-ReleaseFail-CleanUpAndDelete-Reassign-ToWatch", func(t *testing.T) { + var oldNode = UniqueID(121) + + metakv.RemoveWithPrefix("") + ctx, cancel := context.WithCancel(context.TODO()) + factory := msgstream.NewRmsFactory() + _, err := factory.NewMsgStream(context.TODO()) + require.NoError(t, err) + chManager, err := NewChannelManager(metakv, newMockHandler(), withMsgstreamFactory(factory)) + require.NoError(t, err) + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + chManager.watchChannelStatesLoop(ctx) + wg.Done() + }() + + chManager.store = &ChannelStore{ + store: metakv, + channelsInfo: map[int64]*NodeChannelInfo{ + nodeID: {nodeID, []*channel{ + {channel1, collectionID}, + }}, + }, + } + + err = chManager.Release(nodeID, channel1) + assert.NoError(t, err) + chManager.AddNode(oldNode) + + data, err := proto.Marshal(getWatchInfoWithState(datapb.ChannelWatchState_ReleaseFailure)) + require.NoError(t, err) + err = metakv.Save(path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1), string(data)) + require.NoError(t, err) + + // TODO: cancel could arrive earlier than etcd action watch channel + // if etcd has poor response latency. + time.Sleep(time.Second) + cancel() + wg.Wait() + + w, err := metakv.Load(path.Join(prefix, strconv.FormatInt(nodeID, 10))) + assert.Error(t, err) + assert.Empty(t, w) + + checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, oldNode, channel1, collectionID) + }) + +} + +func TestChannelManager(t *testing.T) { + metakv := getMetaKv(t) + defer func() { + metakv.RemoveWithPrefix("") + metakv.Close() + }() + + prefix := Params.DataCoordCfg.ChannelWatchSubPath + t.Run("test AddNode", func(t *testing.T) { + // Note: this test is based on the default registerPolicy + defer metakv.RemoveWithPrefix("") + var ( + collectionID = UniqueID(8) + nodeID, nodeToAdd = UniqueID(118), UniqueID(811) + channel1, channel2 = "channel1", "channel2" + ) + + chManager, err := NewChannelManager(metakv, newMockHandler()) + require.NoError(t, err) + chManager.store = &ChannelStore{ + store: metakv, + channelsInfo: map[int64]*NodeChannelInfo{ + nodeID: {nodeID, []*channel{ + {channel1, collectionID}, + {channel2, collectionID}, + }}, + }, + } + + err = chManager.AddNode(nodeToAdd) + assert.NoError(t, err) + + chInfo := chManager.store.GetNode(nodeID) + assert.Equal(t, 2, len(chInfo.Channels)) + chInfo = chManager.store.GetNode(nodeToAdd) + assert.Equal(t, 0, len(chInfo.Channels)) + + err = chManager.Watch(&channel{"channel-3", collectionID}) + assert.NoError(t, err) + + chInfo = chManager.store.GetNode(nodeToAdd) + assert.Equal(t, 1, len(chInfo.Channels)) + chManager.stateTimer.removeTimers([]string{"channel-3"}) + + checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, nodeToAdd, "channel-3", collectionID) + }) + + t.Run("test Watch", func(t *testing.T) { + defer metakv.RemoveWithPrefix("") + var ( + collectionID = UniqueID(7) + nodeID = UniqueID(117) + bufferCh = "bufferID" + chanToAdd = "new-channel-watch" + ) + + chManager, err := NewChannelManager(metakv, newMockHandler()) + require.NoError(t, err) + + err = chManager.Watch(&channel{bufferCh, collectionID}) + assert.NoError(t, err) + checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, bufferID, bufferCh, collectionID) + + chManager.store.Add(nodeID) + err = chManager.Watch(&channel{chanToAdd, collectionID}) + assert.NoError(t, err) + chManager.stateTimer.removeTimers([]string{chanToAdd}) + + checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, nodeID, chanToAdd, collectionID) + }) + + t.Run("test Release", func(t *testing.T) { + defer metakv.RemoveWithPrefix("") + var ( + collectionID = UniqueID(4) + nodeID, invalidNodeID = UniqueID(114), UniqueID(999) + channelName, invalidChName = "to-release", "invalid-to-release" + ) + + chManager, err := NewChannelManager(metakv, newMockHandler()) + require.NoError(t, err) + chManager.store = &ChannelStore{ + store: metakv, + channelsInfo: map[int64]*NodeChannelInfo{ + nodeID: {nodeID, []*channel{{channelName, collectionID}}}, + }, + } + + err = chManager.Release(invalidNodeID, invalidChName) + assert.Error(t, err) + + err = chManager.Release(nodeID, channelName) + assert.NoError(t, err) + chManager.stateTimer.stopIfExsit(&ackEvent{releaseSuccessAck, channelName, nodeID}) + + checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToRelease, nodeID, channelName, collectionID) + }) + + t.Run("test toDelete", func(t *testing.T) { + defer metakv.RemoveWithPrefix("") + var collectionID = UniqueID(5) + + tests := []struct { + isvalid bool + nodeID UniqueID + chName string + }{ + {true, UniqueID(125), "normal-chan"}, + {true, UniqueID(115), "to-delete-chan"}, + {false, UniqueID(9), "invalid-chan"}, + } + + chManager, err := NewChannelManager(metakv, newMockHandler()) + require.NoError(t, err) + + // prepare tests + for _, test := range tests { + if test.isvalid { + chManager.store.Add(test.nodeID) + ops := getOpsWithWatchInfo(test.nodeID, &channel{test.chName, collectionID}) + err = chManager.store.Update(ops) + require.NoError(t, err) + + info, err := metakv.Load(path.Join(prefix, strconv.FormatInt(test.nodeID, 10), test.chName)) + require.NoError(t, err) + require.NotNil(t, info) + } + } + + remainTest, reassignTest := tests[0], tests[1] + err = chManager.toDelete(reassignTest.nodeID, reassignTest.chName) + assert.NoError(t, err) + chManager.stateTimer.stopIfExsit(&ackEvent{releaseSuccessAck, reassignTest.chName, reassignTest.nodeID}) + + // test no nodes are removed from store + nodesID := chManager.store.GetNodes() + assert.Equal(t, 2, len(nodesID)) + + // test nodes of reassignTest contains no channel + nodeChanInfo := chManager.store.GetNode(reassignTest.nodeID) + assert.Equal(t, 0, len(nodeChanInfo.Channels)) + + // test all channels are assgined to node of remainTest + nodeChanInfo = chManager.store.GetNode(remainTest.nodeID) + assert.Equal(t, 2, len(nodeChanInfo.Channels)) + assert.ElementsMatch(t, []*channel{{remainTest.chName, collectionID}, {reassignTest.chName, collectionID}}, nodeChanInfo.Channels) + + // Delete node of reassginTest and try to toDelete node in remainTest + err = chManager.DeleteNode(reassignTest.nodeID) + require.NoError(t, err) + + err = chManager.toDelete(remainTest.nodeID, remainTest.chName) + assert.NoError(t, err) + chManager.stateTimer.stopIfExsit(&ackEvent{releaseSuccessAck, reassignTest.chName, reassignTest.nodeID}) + + // channel is added to bufferID because there's only one node left + checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, bufferID, remainTest.chName, collectionID) + }) + t.Run("test cleanUpAndDelete", func(t *testing.T) { + defer metakv.RemoveWithPrefix("") + var collectionID = UniqueID(6) + + tests := []struct { + isvalid bool + nodeID UniqueID + chName string + }{ + {true, UniqueID(126), "normal-chan"}, + {true, UniqueID(116), "to-delete-chan"}, + {false, UniqueID(9), "invalid-chan"}, + } + + factory := msgstream.NewRmsFactory() + _, err := factory.NewMsgStream(context.TODO()) + require.NoError(t, err) + chManager, err := NewChannelManager(metakv, newMockHandler(), withMsgstreamFactory(factory)) + + require.NoError(t, err) + + // prepare tests + for _, test := range tests { + if test.isvalid { + chManager.store.Add(test.nodeID) + ops := getOpsWithWatchInfo(test.nodeID, &channel{test.chName, collectionID}) + err = chManager.store.Update(ops) + require.NoError(t, err) + + info, err := metakv.Load(path.Join(prefix, strconv.FormatInt(test.nodeID, 10), test.chName)) + require.NoError(t, err) + require.NotNil(t, info) + } + } + + remainTest, reassignTest := tests[0], tests[1] + err = chManager.cleanUpAndDelete(reassignTest.nodeID, reassignTest.chName) + assert.NoError(t, err) + chManager.stateTimer.stopIfExsit(&ackEvent{releaseSuccessAck, reassignTest.chName, reassignTest.nodeID}) + + // test no nodes are removed from store + nodesID := chManager.store.GetNodes() + assert.Equal(t, 2, len(nodesID)) + + // test nodes of reassignTest contains no channel + nodeChanInfo := chManager.store.GetNode(reassignTest.nodeID) + assert.Equal(t, 0, len(nodeChanInfo.Channels)) + + // test all channels are assgined to node of remainTest + nodeChanInfo = chManager.store.GetNode(remainTest.nodeID) + assert.Equal(t, 2, len(nodeChanInfo.Channels)) + assert.ElementsMatch(t, []*channel{{remainTest.chName, collectionID}, {reassignTest.chName, collectionID}}, nodeChanInfo.Channels) + + // Delete node of reassginTest and try to cleanUpAndDelete node in remainTest + err = chManager.DeleteNode(reassignTest.nodeID) + require.NoError(t, err) + + err = chManager.cleanUpAndDelete(remainTest.nodeID, remainTest.chName) + assert.NoError(t, err) + chManager.stateTimer.stopIfExsit(&ackEvent{releaseSuccessAck, reassignTest.chName, reassignTest.nodeID}) + + // channel is added to bufferID because there's only one node left + checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, bufferID, remainTest.chName, collectionID) + }) + + t.Run("test getChannelByNodeAndName", func(t *testing.T) { + defer metakv.RemoveWithPrefix("") + var ( + nodeID = UniqueID(113) + collectionID = UniqueID(3) + channelName = "get-channel-by-node-and-name" + ) + + chManager, err := NewChannelManager(metakv, newMockHandler()) + require.NoError(t, err) + + ch := chManager.getChannelByNodeAndName(nodeID, channelName) + assert.Nil(t, ch) + + chManager.store.Add(nodeID) + ch = chManager.getChannelByNodeAndName(nodeID, channelName) + assert.Nil(t, ch) + + chManager.store = &ChannelStore{ + store: metakv, + channelsInfo: map[int64]*NodeChannelInfo{ + nodeID: {nodeID, []*channel{{channelName, collectionID}}}, + }, + } + ch = chManager.getChannelByNodeAndName(nodeID, channelName) + assert.NotNil(t, ch) + assert.Equal(t, collectionID, ch.CollectionID) + assert.Equal(t, channelName, ch.Name) + }) + + t.Run("test fillChannelWatchInfoWithState", func(t *testing.T) { + defer metakv.RemoveWithPrefix("") + var ( + nodeID = UniqueID(111) + collectionID = UniqueID(1) + channelName = "fill-channel-watchInfo-with-state" + ) + + chManager, err := NewChannelManager(metakv, newMockHandler()) + require.NoError(t, err) + + tests := []struct { + inState datapb.ChannelWatchState + + description string + }{ + {datapb.ChannelWatchState_ToWatch, "fill toWatch state"}, + {datapb.ChannelWatchState_ToRelease, "fill toRelase state"}, + } + + for _, test := range tests { + t.Run(test.description, func(t *testing.T) { + ops := getReleaseOp(nodeID, &channel{channelName, collectionID}) + for _, op := range ops { + chs := chManager.fillChannelWatchInfoWithState(op, test.inState) + assert.Equal(t, 1, len(chs)) + assert.Equal(t, channelName, chs[0]) + assert.Equal(t, 1, len(op.ChannelWatchInfos)) + assert.Equal(t, test.inState, op.ChannelWatchInfos[0].GetState()) + + chManager.stateTimer.removeTimers(chs) + } + }) + } + }) + + t.Run("test updateWithTimer", func(t *testing.T) { + var ( + nodeID = UniqueID(112) + collectionID = UniqueID(2) + channelName = "update-with-timer" + ) + + chManager, err := NewChannelManager(metakv, newMockHandler()) + require.NoError(t, err) + chManager.store.Add(nodeID) + + opSet := getReleaseOp(nodeID, &channel{channelName, collectionID}) + + chManager.updateWithTimer(opSet, datapb.ChannelWatchState_ToWatch) + chManager.stateTimer.stopIfExsit(&ackEvent{watchSuccessAck, channelName, nodeID}) + + checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, nodeID, channelName, collectionID) + }) +} + +func TestChannelManager_Reload(t *testing.T) { + metakv := getMetaKv(t) + defer func() { + metakv.RemoveWithPrefix("") + metakv.Close() + }() + + var ( + nodeID = UniqueID(200) + collectionID = UniqueID(2) + channelName = "channel-checkOldNodes" + ) + prefix := Params.DataCoordCfg.ChannelWatchSubPath + + getWatchInfoWithState := func(state datapb.ChannelWatchState) *datapb.ChannelWatchInfo { + return &datapb.ChannelWatchInfo{ + Vchan: &datapb.VchannelInfo{ + CollectionID: collectionID, + ChannelName: channelName, + }, + State: state, + TimeoutTs: time.Now().Add(20 * time.Second).UnixNano(), + } + } + + t.Run("test checkOldNodes", func(t *testing.T) { + metakv.RemoveWithPrefix("") + + t.Run("ToWatch", func(t *testing.T) { + defer metakv.RemoveWithPrefix("") + data, err := proto.Marshal(getWatchInfoWithState(datapb.ChannelWatchState_ToWatch)) + require.NoError(t, err) + chManager, err := NewChannelManager(metakv, newMockHandler()) + require.NoError(t, err) + err = metakv.Save(path.Join(prefix, strconv.FormatInt(nodeID, 10), channelName), string(data)) + require.NoError(t, err) + + chManager.checkOldNodes([]UniqueID{nodeID}) + _, ok := chManager.stateTimer.runningTimers.Load(channelName) + assert.True(t, ok) + chManager.stateTimer.removeTimers([]string{channelName}) + }) + + t.Run("ToRelease", func(t *testing.T) { + defer metakv.RemoveWithPrefix("") + data, err := proto.Marshal(getWatchInfoWithState(datapb.ChannelWatchState_ToRelease)) + require.NoError(t, err) + chManager, err := NewChannelManager(metakv, newMockHandler()) + require.NoError(t, err) + err = metakv.Save(path.Join(prefix, strconv.FormatInt(nodeID, 10), channelName), string(data)) + require.NoError(t, err) + err = chManager.checkOldNodes([]UniqueID{nodeID}) + assert.NoError(t, err) + + _, ok := chManager.stateTimer.runningTimers.Load(channelName) + assert.True(t, ok) + chManager.stateTimer.removeTimers([]string{channelName}) + }) + + t.Run("WatchFail", func(t *testing.T) { + defer metakv.RemoveWithPrefix("") + chManager, err := NewChannelManager(metakv, newMockHandler()) + require.NoError(t, err) + chManager.store = &ChannelStore{ + store: metakv, + channelsInfo: map[int64]*NodeChannelInfo{ + nodeID: {nodeID, []*channel{{channelName, collectionID}}}}, + } + + data, err := proto.Marshal(getWatchInfoWithState(datapb.ChannelWatchState_WatchFailure)) + require.NoError(t, err) + err = metakv.Save(path.Join(prefix, strconv.FormatInt(nodeID, 10), channelName), string(data)) + require.NoError(t, err) + err = chManager.checkOldNodes([]UniqueID{nodeID}) + assert.NoError(t, err) + + checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToRelease, nodeID, channelName, collectionID) + chManager.stateTimer.removeTimers([]string{channelName}) + }) + + t.Run("ReleaseSuccess", func(t *testing.T) { + defer metakv.RemoveWithPrefix("") + chManager, err := NewChannelManager(metakv, newMockHandler()) + require.NoError(t, err) + data, err := proto.Marshal(getWatchInfoWithState(datapb.ChannelWatchState_ReleaseSuccess)) + chManager.store = &ChannelStore{ + store: metakv, + channelsInfo: map[int64]*NodeChannelInfo{ + nodeID: {nodeID, []*channel{{channelName, collectionID}}}}, + } + chManager.AddNode(bufferID) + + require.NoError(t, err) + err = metakv.Save(path.Join(prefix, strconv.FormatInt(nodeID, 10), channelName), string(data)) + require.NoError(t, err) + err = chManager.checkOldNodes([]UniqueID{nodeID}) + assert.NoError(t, err) + + v, err := metakv.Load(path.Join(prefix, strconv.FormatInt(nodeID, 10))) + assert.Error(t, err) + assert.Empty(t, v) + }) + + t.Run("ReleaseFail", func(t *testing.T) { + defer metakv.RemoveWithPrefix("") + chManager, err := NewChannelManager(metakv, newMockHandler()) + require.NoError(t, err) + data, err := proto.Marshal(getWatchInfoWithState(datapb.ChannelWatchState_ReleaseSuccess)) + chManager.store = &ChannelStore{ + store: metakv, + channelsInfo: map[int64]*NodeChannelInfo{ + nodeID: {nodeID, []*channel{{channelName, collectionID}}}, + 999: {999, []*channel{}}, + }, + } + require.NoError(t, err) + err = metakv.Save(path.Join(prefix, strconv.FormatInt(nodeID, 10), channelName), string(data)) + require.NoError(t, err) + err = chManager.checkOldNodes([]UniqueID{nodeID, 999}) + assert.NoError(t, err) + + time.Sleep(time.Second) + v, err := metakv.Load(path.Join(prefix, strconv.FormatInt(nodeID, 10))) + assert.Error(t, err) + assert.Empty(t, v) + + checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, 999, channelName, collectionID) + }) + }) + t.Run("test reload with data", func(t *testing.T) { - Params.Init() - kv := memkv.NewMemoryKV() + defer metakv.RemoveWithPrefix("") + + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + hash := consistent.New() - cm, err := NewChannelManager(kv, newMockHandler(), withFactory(NewConsistentHashChannelPolicyFactory(hash))) + cm, err := NewChannelManager(metakv, newMockHandler(), withFactory(NewConsistentHashChannelPolicyFactory(hash))) assert.Nil(t, err) assert.Nil(t, cm.AddNode(1)) assert.Nil(t, cm.AddNode(2)) @@ -37,9 +727,9 @@ func TestReload(t *testing.T) { assert.Nil(t, cm.Watch(&channel{"channel2", 1})) hash2 := consistent.New() - cm2, err := NewChannelManager(kv, newMockHandler(), withFactory(NewConsistentHashChannelPolicyFactory(hash2))) + cm2, err := NewChannelManager(metakv, newMockHandler(), withFactory(NewConsistentHashChannelPolicyFactory(hash2))) assert.Nil(t, err) - assert.Nil(t, cm2.Startup([]int64{1, 2})) + assert.Nil(t, cm2.Startup(ctx, []int64{1, 2})) assert.Nil(t, cm2.AddNode(3)) assert.True(t, cm2.Match(3, "channel1")) assert.True(t, cm2.Match(3, "channel2")) @@ -47,6 +737,12 @@ func TestReload(t *testing.T) { } func TestChannelManager_RemoveChannel(t *testing.T) { + metakv := getMetaKv(t) + defer func() { + metakv.RemoveWithPrefix("") + metakv.Close() + }() + type fields struct { store RWChannelStore } @@ -63,7 +759,7 @@ func TestChannelManager_RemoveChannel(t *testing.T) { "test remove existed channel", fields{ store: &ChannelStore{ - store: memkv.NewMemoryKV(), + store: metakv, channelsInfo: map[int64]*NodeChannelInfo{ 1: { NodeID: 1, diff --git a/internal/datacoord/cluster.go b/internal/datacoord/cluster.go index 4f8789c5c9ffa..0ca372b16ae23 100644 --- a/internal/datacoord/cluster.go +++ b/internal/datacoord/cluster.go @@ -44,7 +44,7 @@ func NewCluster(sessionManager *SessionManager, channelManager *ChannelManager) } // Startup inits the cluster with the given data nodes. -func (c *Cluster) Startup(nodes []*NodeInfo) error { +func (c *Cluster) Startup(ctx context.Context, nodes []*NodeInfo) error { for _, node := range nodes { c.sessionManager.AddSession(node) } @@ -52,7 +52,7 @@ func (c *Cluster) Startup(nodes []*NodeInfo) error { for _, node := range nodes { currs = append(currs, node.NodeID) } - return c.channelManager.Startup(currs) + return c.channelManager.Startup(ctx, currs) } // Register registers a new node in cluster diff --git a/internal/datacoord/cluster_test.go b/internal/datacoord/cluster_test.go index 594add265ce23..01b27e2d2245e 100644 --- a/internal/datacoord/cluster_test.go +++ b/internal/datacoord/cluster_test.go @@ -23,16 +23,35 @@ import ( "github.com/golang/protobuf/proto" "github.com/milvus-io/milvus/internal/kv" - memkv "github.com/milvus-io/milvus/internal/kv/mem" + etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/types" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "stathat.com/c/consistent" ) +func getMetaKv(t *testing.T) kv.MetaKv { + Params.Init() + rootPath := "/etcd/test/root/" + t.Name() + metakv, err := etcdkv.NewMetaKvFactory(rootPath, &Params.EtcdCfg) + require.NoError(t, err) + + return metakv +} + func TestClusterCreate(t *testing.T) { + kv := getMetaKv(t) + defer func() { + kv.RemoveWithPrefix("") + kv.Close() + }() + t.Run("startup normally", func(t *testing.T) { - kv := memkv.NewMemoryKV() + + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + sessionManager := NewSessionManager() channelManager, err := NewChannelManager(kv, newMockHandler()) assert.Nil(t, err) @@ -44,7 +63,7 @@ func TestClusterCreate(t *testing.T) { Address: addr, } nodes := []*NodeInfo{info} - err = cluster.Startup(nodes) + err = cluster.Startup(ctx, nodes) assert.Nil(t, err) dataNodes := sessionManager.GetSessions() assert.EqualValues(t, 1, len(dataNodes)) @@ -52,9 +71,12 @@ func TestClusterCreate(t *testing.T) { }) t.Run("startup with existed channel data", func(t *testing.T) { - Params.Init() + defer kv.RemoveWithPrefix("") + + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + var err error - kv := memkv.NewMemoryKV() info1 := &datapb.ChannelWatchInfo{ Vchan: &datapb.VchannelInfo{ CollectionID: 1, @@ -72,7 +94,7 @@ func TestClusterCreate(t *testing.T) { cluster := NewCluster(sessionManager, channelManager) defer cluster.Close() - err = cluster.Startup([]*NodeInfo{{NodeID: 1, Address: "localhost:9999"}}) + err = cluster.Startup(ctx, []*NodeInfo{{NodeID: 1, Address: "localhost:9999"}}) assert.Nil(t, err) channels := channelManager.GetChannels() @@ -80,7 +102,11 @@ func TestClusterCreate(t *testing.T) { }) t.Run("remove all nodes and restart with other nodes", func(t *testing.T) { - kv := memkv.NewMemoryKV() + defer kv.RemoveWithPrefix("") + + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + sessionManager := NewSessionManager() channelManager, err := NewChannelManager(kv, newMockHandler()) assert.Nil(t, err) @@ -92,7 +118,7 @@ func TestClusterCreate(t *testing.T) { Address: addr, } nodes := []*NodeInfo{info} - err = cluster.Startup(nodes) + err = cluster.Startup(ctx, nodes) assert.Nil(t, err) err = cluster.UnRegister(info) @@ -114,7 +140,7 @@ func TestClusterCreate(t *testing.T) { Address: addr, } nodes = []*NodeInfo{info} - err = clusterReload.Startup(nodes) + err = clusterReload.Startup(ctx, nodes) assert.Nil(t, err) sessions = sessionManager2.GetSessions() assert.EqualValues(t, 1, len(sessions)) @@ -126,8 +152,9 @@ func TestClusterCreate(t *testing.T) { }) t.Run("loadKv Fails", func(t *testing.T) { - kv := memkv.NewMemoryKV() - fkv := &loadPrefixFailKV{TxnKV: kv} + defer kv.RemoveWithPrefix("") + + fkv := &loadPrefixFailKV{MetaKv: kv} _, err := NewChannelManager(fkv, newMockHandler()) assert.NotNil(t, err) }) @@ -135,7 +162,7 @@ func TestClusterCreate(t *testing.T) { // a mock kv that always fail when LoadWithPrefix type loadPrefixFailKV struct { - kv.TxnKV + kv.MetaKv } // LoadWithPrefix override behavior @@ -144,15 +171,25 @@ func (kv *loadPrefixFailKV) LoadWithPrefix(key string) ([]string, []string, erro } func TestRegister(t *testing.T) { + kv := getMetaKv(t) + defer func() { + kv.RemoveWithPrefix("") + kv.Close() + }() + t.Run("register to empty cluster", func(t *testing.T) { - kv := memkv.NewMemoryKV() + defer kv.RemoveWithPrefix("") + + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + sessionManager := NewSessionManager() channelManager, err := NewChannelManager(kv, newMockHandler()) assert.Nil(t, err) cluster := NewCluster(sessionManager, channelManager) defer cluster.Close() addr := "localhost:8080" - err = cluster.Startup(nil) + err = cluster.Startup(ctx, nil) assert.Nil(t, err) info := &NodeInfo{ NodeID: 1, @@ -166,7 +203,11 @@ func TestRegister(t *testing.T) { }) t.Run("register to empty cluster with buffer channels", func(t *testing.T) { - kv := memkv.NewMemoryKV() + defer kv.RemoveWithPrefix("") + + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + sessionManager := NewSessionManager() channelManager, err := NewChannelManager(kv, newMockHandler()) assert.Nil(t, err) @@ -178,7 +219,7 @@ func TestRegister(t *testing.T) { cluster := NewCluster(sessionManager, channelManager) defer cluster.Close() addr := "localhost:8080" - err = cluster.Startup(nil) + err = cluster.Startup(ctx, nil) assert.Nil(t, err) info := &NodeInfo{ NodeID: 1, @@ -195,13 +236,17 @@ func TestRegister(t *testing.T) { }) t.Run("register and restart with no channel", func(t *testing.T) { - kv := memkv.NewMemoryKV() + defer kv.RemoveWithPrefix("") + + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + sessionManager := NewSessionManager() channelManager, err := NewChannelManager(kv, newMockHandler()) assert.Nil(t, err) cluster := NewCluster(sessionManager, channelManager) addr := "localhost:8080" - err = cluster.Startup(nil) + err = cluster.Startup(ctx, nil) assert.Nil(t, err) info := &NodeInfo{ NodeID: 1, @@ -222,8 +267,18 @@ func TestRegister(t *testing.T) { } func TestUnregister(t *testing.T) { + kv := getMetaKv(t) + defer func() { + kv.RemoveWithPrefix("") + kv.Close() + }() + t.Run("remove node after unregister", func(t *testing.T) { - kv := memkv.NewMemoryKV() + defer kv.RemoveWithPrefix("") + + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + sessionManager := NewSessionManager() channelManager, err := NewChannelManager(kv, newMockHandler()) assert.Nil(t, err) @@ -235,7 +290,7 @@ func TestUnregister(t *testing.T) { NodeID: 1, } nodes := []*NodeInfo{info} - err = cluster.Startup(nodes) + err = cluster.Startup(ctx, nodes) assert.Nil(t, err) err = cluster.UnRegister(nodes[0]) assert.Nil(t, err) @@ -244,7 +299,11 @@ func TestUnregister(t *testing.T) { }) t.Run("move channels to online nodes after unregister", func(t *testing.T) { - kv := memkv.NewMemoryKV() + defer kv.RemoveWithPrefix("") + + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + sessionManager := NewSessionManager() channelManager, err := NewChannelManager(kv, newMockHandler()) assert.Nil(t, err) @@ -260,7 +319,7 @@ func TestUnregister(t *testing.T) { NodeID: 2, } nodes := []*NodeInfo{nodeInfo1, nodeInfo2} - err = cluster.Startup(nodes) + err = cluster.Startup(ctx, nodes) assert.Nil(t, err) err = cluster.Watch("ch1", 1) assert.Nil(t, err) @@ -275,10 +334,14 @@ func TestUnregister(t *testing.T) { }) t.Run("remove all channels after unregsiter", func(t *testing.T) { + defer kv.RemoveWithPrefix("") + + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + var mockSessionCreator = func(ctx context.Context, addr string) (types.DataNode, error) { return newMockDataNodeClient(1, nil) } - kv := memkv.NewMemoryKV() sessionManager := NewSessionManager(withSessionCreator(mockSessionCreator)) channelManager, err := NewChannelManager(kv, newMockHandler()) assert.Nil(t, err) @@ -289,7 +352,7 @@ func TestUnregister(t *testing.T) { Address: "localhost:8080", NodeID: 1, } - err = cluster.Startup([]*NodeInfo{nodeInfo}) + err = cluster.Startup(ctx, []*NodeInfo{nodeInfo}) assert.Nil(t, err) err = cluster.Watch("ch_1", 1) assert.Nil(t, err) @@ -305,11 +368,21 @@ func TestUnregister(t *testing.T) { } func TestWatchIfNeeded(t *testing.T) { + kv := getMetaKv(t) + defer func() { + kv.RemoveWithPrefix("") + kv.Close() + }() + t.Run("add deplicated channel to cluster", func(t *testing.T) { + defer kv.RemoveWithPrefix("") + + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + var mockSessionCreator = func(ctx context.Context, addr string) (types.DataNode, error) { return newMockDataNodeClient(1, nil) } - kv := memkv.NewMemoryKV() sessionManager := NewSessionManager(withSessionCreator(mockSessionCreator)) channelManager, err := NewChannelManager(kv, newMockHandler()) assert.Nil(t, err) @@ -322,7 +395,7 @@ func TestWatchIfNeeded(t *testing.T) { NodeID: 1, } - err = cluster.Startup([]*NodeInfo{info}) + err = cluster.Startup(ctx, []*NodeInfo{info}) assert.Nil(t, err) err = cluster.Watch("ch1", 1) assert.Nil(t, err) @@ -332,7 +405,8 @@ func TestWatchIfNeeded(t *testing.T) { }) t.Run("watch channel to empty cluster", func(t *testing.T) { - kv := memkv.NewMemoryKV() + defer kv.RemoveWithPrefix("") + sessionManager := NewSessionManager() channelManager, err := NewChannelManager(kv, newMockHandler()) assert.Nil(t, err) @@ -351,7 +425,12 @@ func TestWatchIfNeeded(t *testing.T) { } func TestConsistentHashPolicy(t *testing.T) { - kv := memkv.NewMemoryKV() + kv := getMetaKv(t) + defer func() { + kv.RemoveWithPrefix("") + kv.Close() + }() + sessionManager := NewSessionManager() chash := consistent.New() factory := NewConsistentHashChannelPolicyFactory(chash) @@ -428,7 +507,14 @@ func TestConsistentHashPolicy(t *testing.T) { } func TestCluster_Flush(t *testing.T) { - kv := memkv.NewMemoryKV() + kv := getMetaKv(t) + defer func() { + kv.RemoveWithPrefix("") + kv.Close() + }() + + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() sessionManager := NewSessionManager() channelManager, err := NewChannelManager(kv, newMockHandler()) assert.Nil(t, err) @@ -440,7 +526,7 @@ func TestCluster_Flush(t *testing.T) { NodeID: 1, } nodes := []*NodeInfo{info} - err = cluster.Startup(nodes) + err = cluster.Startup(ctx, nodes) assert.Nil(t, err) err = cluster.Watch("chan-1", 1) diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 9896e338e86b3..2d6fd6ece1ce5 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -307,7 +307,7 @@ func (s *Server) initCluster() error { } var err error - s.channelManager, err = NewChannelManager(s.kvClient, s.handler, withMsgstreamFactory(s.msFactory)) + s.channelManager, err = NewChannelManager(s.kvClient, s.handler, withMsgstreamFactory(s.msFactory), withStateChecker()) if err != nil { return err } @@ -404,7 +404,7 @@ func (s *Server) initServiceDiscovery() error { datanodes = append(datanodes, info) } - s.cluster.Startup(datanodes) + s.cluster.Startup(s.ctx, datanodes) // TODO implement rewatch logic s.eventCh = s.session.WatchServices(typeutil.DataNodeRole, rev+1, nil) diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index d3611051ae9f3..a2fcdd402fa8d 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -31,7 +31,6 @@ import ( "time" "github.com/milvus-io/milvus/internal/common" - memkv "github.com/milvus-io/milvus/internal/kv/mem" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/mq/msgstream" "github.com/milvus-io/milvus/internal/proto/commonpb" @@ -1967,6 +1966,12 @@ func TestGetCompactionStateWithPlans(t *testing.T) { } func TestOptions(t *testing.T) { + kv := getMetaKv(t) + defer func() { + kv.RemoveWithPrefix("") + kv.Close() + }() + t.Run("SetRootCoordCreator", func(t *testing.T) { svr := newTestServer(t, nil) defer closeTestServer(t, svr) @@ -1983,7 +1988,8 @@ func TestOptions(t *testing.T) { assert.NotNil(t, svr.rootCoordClientCreator) }) t.Run("SetCluster", func(t *testing.T) { - kv := memkv.NewMemoryKV() + defer kv.RemoveWithPrefix("") + sessionManager := NewSessionManager() channelManager, err := NewChannelManager(kv, newMockHandler()) assert.Nil(t, err) @@ -2031,14 +2037,21 @@ func (p *mockPolicyFactory) NewDeregisterPolicy() DeregisterPolicy { } func TestHandleSessionEvent(t *testing.T) { - kv := memkv.NewMemoryKV() + kv := getMetaKv(t) + defer func() { + kv.RemoveWithPrefix("") + kv.Close() + }() + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + channelManager, err := NewChannelManager(kv, newMockHandler(), withFactory(&mockPolicyFactory{})) assert.Nil(t, err) sessionManager := NewSessionManager() cluster := NewCluster(sessionManager, channelManager) assert.Nil(t, err) - err = cluster.Startup(nil) + err = cluster.Startup(ctx, nil) assert.Nil(t, err) defer cluster.Close() @@ -2259,6 +2272,8 @@ func TestImport(t *testing.T) { assert.Nil(t, err) assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.GetErrorCode()) + etcd.StopEtcdServer() + }) t.Run("with closed server", func(t *testing.T) { svr := newTestServer(t, nil) @@ -2316,9 +2331,9 @@ func (ms *MockClosePanicMsgstream) Chan() <-chan *msgstream.MsgPack { } func newTestServer(t *testing.T, receiveCh chan interface{}, opts ...Option) *Server { + var err error Params.Init() Params.CommonCfg.DataCoordTimeTick = Params.CommonCfg.DataCoordTimeTick + strconv.Itoa(rand.Int()) - var err error factory := msgstream.NewPmsFactory() err = factory.Init(&Params) assert.Nil(t, err) @@ -2337,6 +2352,7 @@ func newTestServer(t *testing.T, receiveCh chan interface{}, opts ...Option) *Se svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoord, error) { return newMockRootCoordService(), nil } + assert.Nil(t, err) err = svr.Init() assert.Nil(t, err) @@ -2344,6 +2360,12 @@ func newTestServer(t *testing.T, receiveCh chan interface{}, opts ...Option) *Se assert.Nil(t, err) err = svr.Register() assert.Nil(t, err) + + // Stop channal watch state watcher in tests + if svr.channelManager != nil && svr.channelManager.stopChecker != nil { + svr.channelManager.stopChecker() + } + return svr } diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index b4b741428c338..14c445dbfbd40 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -434,15 +434,12 @@ func (s *Server) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtual } log.Info("DropVChannel plan to remove", zap.String("channel", channel)) - err = s.channelManager.RemoveChannel(channel) + err = s.channelManager.Release(nodeID, channel) if err != nil { - log.Warn("DropVChannel failed to RemoveChannel", zap.String("channel", channel), zap.Error(err)) + log.Warn("DropVChannel failed to ReleaseAndRemove", zap.String("channel", channel), zap.Error(err)) } s.segmentManager.DropSegmentsOfChannel(ctx, channel) - // clean up removal flag - s.meta.FinishRemoveChannel(channel) - // no compaction triggerred in Drop procedure resp.Status.ErrorCode = commonpb.ErrorCode_Success return resp, nil diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 7354c5a87f7a1..cab5ae52f5dab 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -304,9 +304,8 @@ func (node *DataNode) handleChannelEvt(evt *clientv3.Event) { func (node *DataNode) handleWatchInfo(e *event, key string, data []byte) { switch e.eventType { - case putEventType: - log.Info("DataNode is handling watchInfo put event", zap.String("key", key)) + case putEventType: watchInfo, err := parsePutEventData(data) if err != nil { log.Warn("fail to handle watchInfo", zap.Int("event type", e.eventType), zap.String("key", key), zap.Error(err)) @@ -314,12 +313,13 @@ func (node *DataNode) handleWatchInfo(e *event, key string, data []byte) { } if isEndWatchState(watchInfo.State) { - log.Warn("DataNode received a PUT event with a end State", zap.String("state", watchInfo.State.String())) + log.Warn("DataNode received a PUT event with an end State", zap.String("state", watchInfo.State.String())) return } e.info = watchInfo e.vChanName = watchInfo.GetVchan().GetChannelName() + log.Info("DataNode is handling watchInfo put event", zap.String("key", key), zap.String("state", watchInfo.GetState().String())) case deleteEventType: log.Info("DataNode is handling watchInfo delete event", zap.String("key", key)) @@ -354,7 +354,6 @@ func parsePutEventData(data []byte) (*datapb.ChannelWatchInfo, error) { if watchInfo.Vchan == nil { return nil, fmt.Errorf("invalid event: ChannelWatchInfo with nil VChannelInfo") } - return &watchInfo, nil } @@ -366,6 +365,7 @@ func parseDeleteEventKey(key string) string { func (node *DataNode) handlePutEvent(watchInfo *datapb.ChannelWatchInfo, version int64) (err error) { vChanName := watchInfo.GetVchan().GetChannelName() + log.Info("handle put event", zap.String("watch state", watchInfo.State.String()), zap.String("vChanName", vChanName)) switch watchInfo.State { case datapb.ChannelWatchState_Uncomplete, datapb.ChannelWatchState_ToWatch: