Skip to content

Commit

Permalink
Adopt the extended state in DataCoord (milvus-io#16200)
Browse files Browse the repository at this point in the history
ChannelWatchInfo with ToWatch, ToRelease will trigger a timer.
ChannelManager now reacts to different ChannelWatch states.

- WatchSuccess > log this info

- WatchFailure/WatchTimeout > ToRelease

- ReleaseSuccess > Delete, reassign if not from DropCollection

- ReleaseFailure/ReleaseTimeout > Cleanup subscription and Delete,
  reassgin if not from DropCollection.

Some Notes:
1. Reassignment will add this channel to buffer if there's only one node.

See also: milvus-io#15846

Signed-off-by: yangxuan <[email protected]>
  • Loading branch information
XuanYang-cn authored Mar 28, 2022
1 parent 3936637 commit a77dd10
Show file tree
Hide file tree
Showing 11 changed files with 1,613 additions and 87 deletions.
2 changes: 1 addition & 1 deletion .clang-format
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,4 @@ AlignTrailingComments: true
SortIncludes: false
Standard: Latest
AlignAfterOpenBracket: Align
BinPackParameters: false
BinPackParameters: false
178 changes: 178 additions & 0 deletions internal/datacoord/channel_checker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package datacoord

import (
"fmt"
"path"
"strconv"
"sync"
"time"

"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/datapb"

clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)

type channelStateTimer struct {
watchkv kv.MetaKv
runningTimers sync.Map // channel name to timer stop channels
etcdWatcher clientv3.WatchChan
timeoutWatcher chan *ackEvent
}

func newChannelStateTimer(kv kv.MetaKv) *channelStateTimer {
return &channelStateTimer{
watchkv: kv,
timeoutWatcher: make(chan *ackEvent, 20),
}
}

func (c *channelStateTimer) getWatchers(prefix string) (clientv3.WatchChan, chan *ackEvent) {
if c.etcdWatcher == nil {
c.etcdWatcher = c.watchkv.WatchWithPrefix(prefix)

}
return c.etcdWatcher, c.timeoutWatcher
}

func (c *channelStateTimer) loadAllChannels(nodeID UniqueID) ([]*datapb.ChannelWatchInfo, error) {
prefix := path.Join(Params.DataCoordCfg.ChannelWatchSubPath, strconv.FormatInt(nodeID, 10))

// TODO: change to LoadWithPrefixBytes
keys, values, err := c.watchkv.LoadWithPrefix(prefix)
if err != nil {
return nil, err
}

ret := []*datapb.ChannelWatchInfo{}

for i, k := range keys {
watchInfo, err := parseWatchInfo(k, []byte(values[i]))
if err != nil {
// TODO: delete this kv later
log.Warn("invalid watchInfo loaded", zap.Error(err))
continue
}

ret = append(ret, watchInfo)
}

return ret, nil
}

// startOne can write ToWatch or ToRelease states.
func (c *channelStateTimer) startOne(watchState datapb.ChannelWatchState, channelName string, nodeID UniqueID, timeoutTs int64) {
if timeoutTs == 0 {
log.Debug("zero timeoutTs, skip starting timer",
zap.String("watch state", watchState.String()),
zap.Int64("nodeID", nodeID),
zap.String("channel name", channelName),
)
return
}
stop := make(chan struct{})
c.runningTimers.Store(channelName, stop)
timeoutT := time.Unix(0, timeoutTs)
go func() {
log.Debug("timer started",
zap.String("watch state", watchState.String()),
zap.Int64("nodeID", nodeID),
zap.String("channel name", channelName),
zap.Time("timeout time", timeoutT))
select {
case <-time.NewTimer(time.Until(timeoutT)).C:
log.Info("timeout and stop timer: wait for channel ACK timeout",
zap.String("state", watchState.String()),
zap.String("channel name", channelName),
zap.Time("timeout time", timeoutT))
ackType := getAckType(watchState)
c.notifyTimeoutWatcher(&ackEvent{ackType, channelName, nodeID})
case <-stop:
log.Debug("stop timer before timeout",
zap.String("state", watchState.String()),
zap.String("channel name", channelName),
zap.Time("timeout time", timeoutT))
}
}()
}

func (c *channelStateTimer) notifyTimeoutWatcher(e *ackEvent) {
c.timeoutWatcher <- e
}

func (c *channelStateTimer) removeTimers(channels []string) {
for _, channel := range channels {
if stop, ok := c.runningTimers.LoadAndDelete(channel); ok {
close(stop.(chan struct{}))
}
}
}

func (c *channelStateTimer) stopIfExsit(e *ackEvent) {
stop, ok := c.runningTimers.LoadAndDelete(e.channelName)
if ok && e.ackType != watchTimeoutAck && e.ackType != releaseTimeoutAck {
close(stop.(chan struct{}))
}
}

func parseWatchInfo(key string, data []byte) (*datapb.ChannelWatchInfo, error) {
watchInfo := datapb.ChannelWatchInfo{}
if err := proto.Unmarshal(data, &watchInfo); err != nil {
return nil, fmt.Errorf("invalid event data: fail to parse ChannelWatchInfo, key: %s, err: %v", key, err)

}

if watchInfo.Vchan == nil {
return nil, fmt.Errorf("invalid event: ChannelWatchInfo with nil VChannelInfo, key: %s", key)
}

return &watchInfo, nil
}

// parseAckEvent transfers key-values from etcd into ackEvent
func parseAckEvent(nodeID UniqueID, info *datapb.ChannelWatchInfo) *ackEvent {
ret := &ackEvent{
ackType: getAckType(info.GetState()),
channelName: info.GetVchan().GetChannelName(),
nodeID: nodeID,
}
return ret
}

func getAckType(state datapb.ChannelWatchState) ackType {
switch state {
case datapb.ChannelWatchState_WatchSuccess, datapb.ChannelWatchState_Complete:
return watchSuccessAck
case datapb.ChannelWatchState_WatchFailure:
return watchFailAck
case datapb.ChannelWatchState_ReleaseSuccess:
return releaseSuccessAck
case datapb.ChannelWatchState_ReleaseFailure:
return releaseFailAck
case datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_Uncomplete: // unchange watch states generates timeout acks
return watchTimeoutAck
case datapb.ChannelWatchState_ToRelease: // unchange watch states generates timeout acks
return releaseTimeoutAck
default:
return invalidAck
}
}
198 changes: 198 additions & 0 deletions internal/datacoord/channel_checker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package datacoord

import (
"path"
"testing"
"time"

"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/golang/protobuf/proto"
)

func TestChannelStateTimer(t *testing.T) {
kv := getMetaKv(t)
defer kv.Close()

prefix := Params.DataCoordCfg.ChannelWatchSubPath

t.Run("test getWatcher", func(t *testing.T) {
timer := newChannelStateTimer(kv)

etcdCh, timeoutCh := timer.getWatchers(prefix)
assert.NotNil(t, etcdCh)
assert.NotNil(t, timeoutCh)

timer.getWatchers(prefix)
assert.NotNil(t, etcdCh)
assert.NotNil(t, timeoutCh)
})

t.Run("test loadAllChannels", func(t *testing.T) {
defer kv.RemoveWithPrefix("")
timer := newChannelStateTimer(kv)
timer.loadAllChannels(1)

validWatchInfo := datapb.ChannelWatchInfo{
Vchan: &datapb.VchannelInfo{},
StartTs: time.Now().Unix(),
State: datapb.ChannelWatchState_ToWatch,
TimeoutTs: time.Now().Add(20 * time.Millisecond).UnixNano(),
}
validData, err := proto.Marshal(&validWatchInfo)
require.NoError(t, err)

prefix = Params.DataCoordCfg.ChannelWatchSubPath
prepareKvs := map[string]string{
path.Join(prefix, "1/channel-1"): "invalidWatchInfo",
path.Join(prefix, "1/channel-2"): string(validData),
path.Join(prefix, "2/channel-3"): string(validData),
}

err = kv.MultiSave(prepareKvs)
require.NoError(t, err)

tests := []struct {
inNodeID UniqueID
outLen int
}{
{1, 1},
{2, 1},
{3, 0},
}

for _, test := range tests {
infos, err := timer.loadAllChannels(test.inNodeID)
assert.NoError(t, err)
assert.Equal(t, test.outLen, len(infos))
}
})

t.Run("test startOne", func(t *testing.T) {
normalTimeoutTs := time.Now().Add(20 * time.Second).UnixNano()
nowTimeoutTs := time.Now().UnixNano()
zeroTimeoutTs := int64(0)
tests := []struct {
channelName string
timeoutTs int64

description string
}{
{"channel-1", normalTimeoutTs, "test stop"},
{"channel-2", nowTimeoutTs, "test timeout"},
{"channel-3", zeroTimeoutTs, "not start"},
}

timer := newChannelStateTimer(kv)

_, timeoutCh := timer.getWatchers(prefix)

for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
timer.startOne(datapb.ChannelWatchState_ToWatch, test.channelName, 1, test.timeoutTs)
if test.timeoutTs == nowTimeoutTs {
e := <-timeoutCh
assert.Equal(t, watchTimeoutAck, e.ackType)
assert.Equal(t, test.channelName, e.channelName)
} else {
timer.stopIfExsit(&ackEvent{watchSuccessAck, test.channelName, 1})
}
})
}

timer.startOne(datapb.ChannelWatchState_ToWatch, "channel-remove", 1, normalTimeoutTs)
timer.removeTimers([]string{"channel-remove"})
})
}

func TestChannelStateTimer_parses(t *testing.T) {
const (
ValidTest = true
InValidTest = false
)

t.Run("test parseWatchInfo", func(t *testing.T) {
validWatchInfo := datapb.ChannelWatchInfo{
Vchan: &datapb.VchannelInfo{},
StartTs: time.Now().Unix(),
State: datapb.ChannelWatchState_ToWatch,
TimeoutTs: time.Now().Add(20 * time.Millisecond).UnixNano(),
}
validData, err := proto.Marshal(&validWatchInfo)
require.NoError(t, err)

invalidDataUnableToMarshal := []byte("invalidData")

invalidWatchInfoNilVchan := validWatchInfo
invalidWatchInfoNilVchan.Vchan = nil
invalidDataNilVchan, err := proto.Marshal(&invalidWatchInfoNilVchan)
require.NoError(t, err)

tests := []struct {
inKey string
inData []byte

isValid bool
description string
}{
{"key", validData, ValidTest, "test with valid watchInfo"},
{"key", invalidDataUnableToMarshal, InValidTest, "test with watchInfo unable to marshal"},
{"key", invalidDataNilVchan, InValidTest, "test with watchInfo with nil Vchan"},
}

for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
info, err := parseWatchInfo(test.inKey, test.inData)
if test.isValid {
assert.NoError(t, err)
assert.NotNil(t, info)
assert.Equal(t, info.GetState(), validWatchInfo.GetState())
assert.Equal(t, info.GetStartTs(), validWatchInfo.GetStartTs())
assert.Equal(t, info.GetTimeoutTs(), validWatchInfo.GetTimeoutTs())
} else {
assert.Nil(t, info)
assert.Error(t, err)
}
})
}
})

t.Run("test getAckType", func(t *testing.T) {
tests := []struct {
inState datapb.ChannelWatchState
outAckType ackType
}{
{datapb.ChannelWatchState_WatchSuccess, watchSuccessAck},
{datapb.ChannelWatchState_WatchFailure, watchFailAck},
{datapb.ChannelWatchState_ToWatch, watchTimeoutAck},
{datapb.ChannelWatchState_Uncomplete, watchTimeoutAck},
{datapb.ChannelWatchState_ReleaseSuccess, releaseSuccessAck},
{datapb.ChannelWatchState_ReleaseFailure, releaseFailAck},
{datapb.ChannelWatchState_ToRelease, releaseTimeoutAck},
{100, invalidAck},
}

for _, test := range tests {
assert.Equal(t, test.outAckType, getAckType(test.inState))
}

})
}
Loading

0 comments on commit a77dd10

Please sign in to comment.