From 22477d4601f49f121c3f392c817c6c76d80538b2 Mon Sep 17 00:00:00 2001 From: Jiquan Long Date: Fri, 23 Sep 2022 16:56:50 +0800 Subject: [PATCH] UnWatch channels if failed to create collection (#19390) Signed-off-by: longjiquan Signed-off-by: longjiquan --- internal/rootcoord/create_collection_task.go | 15 ++- .../rootcoord/create_collection_task_test.go | 19 ++- internal/rootcoord/garbage_collector.go | 12 +- internal/rootcoord/garbage_collector_test.go | 104 +++++++++++----- internal/rootcoord/mocks/GarbageCollector.go | 87 ++++++++++++++ internal/rootcoord/step.go | 7 +- internal/tso/global_allocator.go | 1 + internal/tso/mock_global_allocator.go | 2 + internal/tso/mocks/Allocator.go | 111 ++++++++++++++++++ scripts/run_go_codecov.sh | 4 +- 10 files changed, 320 insertions(+), 42 deletions(-) create mode 100644 internal/rootcoord/mocks/GarbageCollector.go create mode 100644 internal/tso/mocks/Allocator.go diff --git a/internal/rootcoord/create_collection_task.go b/internal/rootcoord/create_collection_task.go index 5ae41c8d2ae99..18cbbf0ccf717 100644 --- a/internal/rootcoord/create_collection_task.go +++ b/internal/rootcoord/create_collection_task.go @@ -268,7 +268,14 @@ func (t *createCollectionTask) Execute(ctx context.Context) error { }, &deleteCollectionMetaStep{ baseStep: baseStep{core: t.core}, collectionID: collID, - ts: ts, + // When we undo createCollectionTask, this ts may be less than the ts when unwatch channels. + ts: ts, + }) + // serve for this case: watching channels succeed in datacoord but failed due to network failure. + undoTask.AddStep(&nullStep{}, &unwatchChannelsStep{ + baseStep: baseStep{core: t.core}, + collectionID: collID, + channels: t.channels, }) undoTask.AddStep(&watchChannelsStep{ baseStep: baseStep{core: t.core}, @@ -278,11 +285,7 @@ func (t *createCollectionTask) Execute(ctx context.Context) error { vChannels: t.channels.virtualChannels, startPositions: toKeyDataPairs(startPositions), }, - }, &unwatchChannelsStep{ - baseStep: baseStep{core: t.core}, - collectionID: collID, - channels: t.channels, - }) + }, &nullStep{}) undoTask.AddStep(&changeCollectionStateStep{ baseStep: baseStep{core: t.core}, collectionID: collID, diff --git a/internal/rootcoord/create_collection_task_test.go b/internal/rootcoord/create_collection_task_test.go index 14b8b7fd96c92..d5de17b50fc26 100644 --- a/internal/rootcoord/create_collection_task_test.go +++ b/internal/rootcoord/create_collection_task_test.go @@ -6,6 +6,10 @@ import ( "testing" "time" + mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks" + + "github.com/stretchr/testify/mock" + "github.com/golang/protobuf/proto" "github.com/milvus-io/milvus/api/commonpb" "github.com/milvus-io/milvus/api/milvuspb" @@ -446,17 +450,26 @@ func Test_createCollectionTask_Execute(t *testing.T) { broker.WatchChannelsFunc = func(ctx context.Context, info *watchInfo) error { return nil } + unwatchChannelsCalled := false unwatchChannelsChan := make(chan struct{}, 1) - broker.UnwatchChannelsFunc = func(ctx context.Context, info *watchInfo) error { + gc := mockrootcoord.NewGarbageCollector(t) + gc.On("GcCollectionData", + mock.Anything, // context.Context + mock.Anything, // *model.Collection + ).Return(func(ctx context.Context, collection *model.Collection) (ddlTs Timestamp) { + for _, pchan := range pchans { + ticker.syncedTtHistogram.update(pchan, 101) + } unwatchChannelsCalled = true unwatchChannelsChan <- struct{}{} - return nil - } + return 100 + }, nil) core := newTestCore(withValidIDAllocator(), withMeta(meta), withTtSynchronizer(ticker), + withGarbageCollector(gc), withBroker(broker)) schema := &schemapb.CollectionSchema{ diff --git a/internal/rootcoord/garbage_collector.go b/internal/rootcoord/garbage_collector.go index 0ccbdfa7fb12e..0bbc6c46f7fec 100644 --- a/internal/rootcoord/garbage_collector.go +++ b/internal/rootcoord/garbage_collector.go @@ -10,6 +10,7 @@ import ( "github.com/milvus-io/milvus/internal/metastore/model" ) +//go:generate mockery --name=GarbageCollector --outpkg=mockrootcoord type GarbageCollector interface { ReDropCollection(collMeta *model.Collection, ts Timestamp) RemoveCreatingCollection(collMeta *model.Collection) @@ -62,7 +63,11 @@ func (c *bgGarbageCollector) ReDropCollection(collMeta *model.Collection, ts Tim } func (c *bgGarbageCollector) RemoveCreatingCollection(collMeta *model.Collection) { + // TODO: remove this after data gc can be notified by rpc. + c.s.chanTimeTick.addDmlChannels(collMeta.PhysicalChannelNames...) + redo := newBaseRedoTask(c.s.stepExecutor) + redo.AddAsyncStep(&unwatchChannelsStep{ baseStep: baseStep{core: c.s}, collectionID: collMeta.CollectionID, @@ -71,10 +76,15 @@ func (c *bgGarbageCollector) RemoveCreatingCollection(collMeta *model.Collection physicalChannels: collMeta.PhysicalChannelNames, }, }) + redo.AddAsyncStep(&removeDmlChannelsStep{ + baseStep: baseStep{core: c.s}, + pChannels: collMeta.PhysicalChannelNames, + }) redo.AddAsyncStep(&deleteCollectionMetaStep{ baseStep: baseStep{core: c.s}, collectionID: collMeta.CollectionID, - ts: collMeta.CreateTime, + // When we undo createCollectionTask, this ts may be less than the ts when unwatch channels. + ts: collMeta.CreateTime, }) // err is ignored since no sync steps will be executed. _ = redo.Execute(context.Background()) diff --git a/internal/rootcoord/garbage_collector_test.go b/internal/rootcoord/garbage_collector_test.go index 09118625fa426..391a68a44a7e1 100644 --- a/internal/rootcoord/garbage_collector_test.go +++ b/internal/rootcoord/garbage_collector_test.go @@ -3,8 +3,12 @@ package rootcoord import ( "context" "errors" + "fmt" "testing" + mocktso "github.com/milvus-io/milvus/internal/tso/mocks" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/assert" "github.com/milvus-io/milvus/internal/metastore/model" @@ -164,45 +168,87 @@ func TestGarbageCollectorCtx_ReDropCollection(t *testing.T) { func TestGarbageCollectorCtx_RemoveCreatingCollection(t *testing.T) { t.Run("failed to UnwatchChannels", func(t *testing.T) { - broker := newMockBroker() - broker.UnwatchChannelsFunc = func(ctx context.Context, info *watchInfo) error { - return errors.New("error mock UnwatchChannels") + defer cleanTestEnv() + + shardNum := 2 + + ticker := newRocksMqTtSynchronizer() + pchans := ticker.getDmlChannelNames(shardNum) + + tsoAllocator := mocktso.NewAllocator(t) + tsoAllocator. + On("GenerateTSO", mock.AnythingOfType("uint32")). + Return(Timestamp(0), errors.New("error mock GenerateTSO")) + + executed := make(chan struct{}, 1) + executor := newMockStepExecutor() + executor.AddStepsFunc = func(s *stepStack) { + s.Execute(context.Background()) + executed <- struct{}{} } - core := newTestCore(withBroker(broker)) + + core := newTestCore(withTtSynchronizer(ticker), withTsoAllocator(tsoAllocator), withStepExecutor(executor)) gc := newBgGarbageCollector(core) + core.ddlTsLockManager = newDdlTsLockManagerV2(tsoAllocator) core.garbageCollector = gc - gc.RemoveCreatingCollection(&model.Collection{}) + + gc.RemoveCreatingCollection(&model.Collection{PhysicalChannelNames: pchans}) + <-executed }) t.Run("failed to RemoveCollection", func(t *testing.T) { - broker := newMockBroker() - unwatchChannelsCalled := false - unwatchChannelsChan := make(chan struct{}, 1) - broker.UnwatchChannelsFunc = func(ctx context.Context, info *watchInfo) error { - unwatchChannelsCalled = true - unwatchChannelsChan <- struct{}{} - return nil + defer cleanTestEnv() + + shardNum := 2 + + ticker := newRocksMqTtSynchronizer() + pchans := ticker.getDmlChannelNames(shardNum) + + tsoAllocator := mocktso.NewAllocator(t) + tsoAllocator. + On("GenerateTSO", mock.AnythingOfType("uint32")). + Return(Timestamp(100), nil) + + for _, pchan := range pchans { + ticker.syncedTtHistogram.update(pchan, 101) } + meta := newMockMetaTable() + removeCollectionCalled := false + removeCollectionChan := make(chan struct{}, 1) meta.RemoveCollectionFunc = func(ctx context.Context, collectionID UniqueID, ts Timestamp) error { - return errors.New("error mock RemoveCollection") + removeCollectionCalled = true + removeCollectionChan <- struct{}{} + return fmt.Errorf("error mock RemoveCollection") } - core := newTestCore(withBroker(broker), withMeta(meta)) + + core := newTestCore(withTtSynchronizer(ticker), withMeta(meta), withTsoAllocator(tsoAllocator)) gc := newBgGarbageCollector(core) - gc.RemoveCreatingCollection(&model.Collection{}) - <-unwatchChannelsChan - assert.True(t, unwatchChannelsCalled) + core.ddlTsLockManager = newDdlTsLockManagerV2(tsoAllocator) + core.garbageCollector = gc + + gc.RemoveCreatingCollection(&model.Collection{PhysicalChannelNames: pchans}) + <-removeCollectionChan + assert.True(t, removeCollectionCalled) // though it fail. }) t.Run("normal case", func(t *testing.T) { - broker := newMockBroker() - unwatchChannelsCalled := false - unwatchChannelsChan := make(chan struct{}, 1) - broker.UnwatchChannelsFunc = func(ctx context.Context, info *watchInfo) error { - unwatchChannelsCalled = true - unwatchChannelsChan <- struct{}{} - return nil + defer cleanTestEnv() + + shardNum := 2 + + ticker := newRocksMqTtSynchronizer() + pchans := ticker.getDmlChannelNames(shardNum) + + tsoAllocator := mocktso.NewAllocator(t) + tsoAllocator. + On("GenerateTSO", mock.AnythingOfType("uint32")). + Return(Timestamp(100), nil) + + for _, pchan := range pchans { + ticker.syncedTtHistogram.update(pchan, 101) } + meta := newMockMetaTable() removeCollectionCalled := false removeCollectionChan := make(chan struct{}, 1) @@ -211,11 +257,13 @@ func TestGarbageCollectorCtx_RemoveCreatingCollection(t *testing.T) { removeCollectionChan <- struct{}{} return nil } - core := newTestCore(withBroker(broker), withMeta(meta)) + + core := newTestCore(withTtSynchronizer(ticker), withMeta(meta), withTsoAllocator(tsoAllocator)) gc := newBgGarbageCollector(core) - gc.RemoveCreatingCollection(&model.Collection{}) - <-unwatchChannelsChan - assert.True(t, unwatchChannelsCalled) + core.ddlTsLockManager = newDdlTsLockManagerV2(tsoAllocator) + core.garbageCollector = gc + + gc.RemoveCreatingCollection(&model.Collection{PhysicalChannelNames: pchans}) <-removeCollectionChan assert.True(t, removeCollectionCalled) }) diff --git a/internal/rootcoord/mocks/GarbageCollector.go b/internal/rootcoord/mocks/GarbageCollector.go new file mode 100644 index 0000000000000..e5c32c3cc644f --- /dev/null +++ b/internal/rootcoord/mocks/GarbageCollector.go @@ -0,0 +1,87 @@ +// Code generated by mockery v2.14.0. DO NOT EDIT. + +package mockrootcoord + +import ( + context "context" + + model "github.com/milvus-io/milvus/internal/metastore/model" + mock "github.com/stretchr/testify/mock" +) + +// GarbageCollector is an autogenerated mock type for the GarbageCollector type +type GarbageCollector struct { + mock.Mock +} + +// GcCollectionData provides a mock function with given fields: ctx, coll +func (_m *GarbageCollector) GcCollectionData(ctx context.Context, coll *model.Collection) (uint64, error) { + ret := _m.Called(ctx, coll) + + var r0 uint64 + if rf, ok := ret.Get(0).(func(context.Context, *model.Collection) uint64); ok { + r0 = rf(ctx, coll) + } else { + r0 = ret.Get(0).(uint64) + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *model.Collection) error); ok { + r1 = rf(ctx, coll) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GcPartitionData provides a mock function with given fields: ctx, pChannels, partition +func (_m *GarbageCollector) GcPartitionData(ctx context.Context, pChannels []string, partition *model.Partition) (uint64, error) { + ret := _m.Called(ctx, pChannels, partition) + + var r0 uint64 + if rf, ok := ret.Get(0).(func(context.Context, []string, *model.Partition) uint64); ok { + r0 = rf(ctx, pChannels, partition) + } else { + r0 = ret.Get(0).(uint64) + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, []string, *model.Partition) error); ok { + r1 = rf(ctx, pChannels, partition) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// ReDropCollection provides a mock function with given fields: collMeta, ts +func (_m *GarbageCollector) ReDropCollection(collMeta *model.Collection, ts uint64) { + _m.Called(collMeta, ts) +} + +// ReDropPartition provides a mock function with given fields: pChannels, partition, ts +func (_m *GarbageCollector) ReDropPartition(pChannels []string, partition *model.Partition, ts uint64) { + _m.Called(pChannels, partition, ts) +} + +// RemoveCreatingCollection provides a mock function with given fields: collMeta +func (_m *GarbageCollector) RemoveCreatingCollection(collMeta *model.Collection) { + _m.Called(collMeta) +} + +type mockConstructorTestingTNewGarbageCollector interface { + mock.TestingT + Cleanup(func()) +} + +// NewGarbageCollector creates a new instance of GarbageCollector. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewGarbageCollector(t mockConstructorTestingTNewGarbageCollector) *GarbageCollector { + mock := &GarbageCollector{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/rootcoord/step.go b/internal/rootcoord/step.go index e6a07774c6437..089ada7e37c90 100644 --- a/internal/rootcoord/step.go +++ b/internal/rootcoord/step.go @@ -111,8 +111,11 @@ type unwatchChannelsStep struct { } func (s *unwatchChannelsStep) Execute(ctx context.Context) ([]nestedStep, error) { - err := s.core.broker.UnwatchChannels(ctx, &watchInfo{collectionID: s.collectionID, vChannels: s.channels.virtualChannels}) - return nil, err + unwatchByDropMsg := &deleteCollectionDataStep{ + baseStep: baseStep{core: s.core}, + coll: &model.Collection{CollectionID: s.collectionID, PhysicalChannelNames: s.channels.physicalChannels}, + } + return unwatchByDropMsg.Execute(ctx) } func (s *unwatchChannelsStep) Desc() string { diff --git a/internal/tso/global_allocator.go b/internal/tso/global_allocator.go index 80e97ece5f410..674cf75864207 100644 --- a/internal/tso/global_allocator.go +++ b/internal/tso/global_allocator.go @@ -42,6 +42,7 @@ import ( "go.uber.org/zap" ) +//go:generate mockery --name=Allocator --outpkg=mocktso // Allocator is a Timestamp Oracle allocator. type Allocator interface { // Initialize is used to initialize a TSO allocator. diff --git a/internal/tso/mock_global_allocator.go b/internal/tso/mock_global_allocator.go index d142f384a378a..8611233795a76 100644 --- a/internal/tso/mock_global_allocator.go +++ b/internal/tso/mock_global_allocator.go @@ -4,6 +4,8 @@ import ( "time" ) +// TODO(longjiquan): replace this by mockery. + type MockAllocator struct { Allocator InitializeF func() error diff --git a/internal/tso/mocks/Allocator.go b/internal/tso/mocks/Allocator.go new file mode 100644 index 0000000000000..ee9fb641e4ecb --- /dev/null +++ b/internal/tso/mocks/Allocator.go @@ -0,0 +1,111 @@ +// Code generated by mockery v2.14.0. DO NOT EDIT. + +package mocktso + +import ( + time "time" + + mock "github.com/stretchr/testify/mock" +) + +// Allocator is an autogenerated mock type for the Allocator type +type Allocator struct { + mock.Mock +} + +// GenerateTSO provides a mock function with given fields: count +func (_m *Allocator) GenerateTSO(count uint32) (uint64, error) { + ret := _m.Called(count) + + var r0 uint64 + if rf, ok := ret.Get(0).(func(uint32) uint64); ok { + r0 = rf(count) + } else { + r0 = ret.Get(0).(uint64) + } + + var r1 error + if rf, ok := ret.Get(1).(func(uint32) error); ok { + r1 = rf(count) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetLastSavedTime provides a mock function with given fields: +func (_m *Allocator) GetLastSavedTime() time.Time { + ret := _m.Called() + + var r0 time.Time + if rf, ok := ret.Get(0).(func() time.Time); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(time.Time) + } + + return r0 +} + +// Initialize provides a mock function with given fields: +func (_m *Allocator) Initialize() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Reset provides a mock function with given fields: +func (_m *Allocator) Reset() { + _m.Called() +} + +// SetTSO provides a mock function with given fields: _a0 +func (_m *Allocator) SetTSO(_a0 uint64) error { + ret := _m.Called(_a0) + + var r0 error + if rf, ok := ret.Get(0).(func(uint64) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// UpdateTSO provides a mock function with given fields: +func (_m *Allocator) UpdateTSO() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +type mockConstructorTestingTNewAllocator interface { + mock.TestingT + Cleanup(func()) +} + +// NewAllocator creates a new instance of Allocator. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewAllocator(t mockConstructorTestingTNewAllocator) *Allocator { + mock := &Allocator{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/scripts/run_go_codecov.sh b/scripts/run_go_codecov.sh index 736588e2b4e30..8a5b976258869 100755 --- a/scripts/run_go_codecov.sh +++ b/scripts/run_go_codecov.sh @@ -30,10 +30,10 @@ echo "Running unittest under ./internal" if [[ $(uname -s) == "Darwin" && "$(uname -m)" == "arm64" ]]; then APPLE_SILICON_FLAG="-tags dynamic" fi -for d in $(go list ./internal/... | grep -v -e vendor -e kafka -e planparserv2/generated -e internal/mocks); do +for d in $(go list ./internal/... | grep -v -e vendor -e kafka -e planparserv2/generated -e mocks); do go test -race ${APPLE_SILICON_FLAG} -v -coverpkg=./... -coverprofile=profile.out -covermode=atomic "$d" if [ -f profile.out ]; then - grep -v kafka profile.out | grep -v planparserv2/generated | sed '1d' >> ${FILE_COVERAGE_INFO} + grep -v kafka profile.out | grep -v planparserv2/generated | grep -v mocks | sed '1d' >> ${FILE_COVERAGE_INFO} rm profile.out fi done