Skip to content

Commit

Permalink
UnWatch channels if failed to create collection (milvus-io#19390)
Browse files Browse the repository at this point in the history
Signed-off-by: longjiquan <[email protected]>

Signed-off-by: longjiquan <[email protected]>
  • Loading branch information
longjiquan authored Sep 23, 2022
1 parent d4bc004 commit 22477d4
Show file tree
Hide file tree
Showing 10 changed files with 320 additions and 42 deletions.
15 changes: 9 additions & 6 deletions internal/rootcoord/create_collection_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,14 @@ func (t *createCollectionTask) Execute(ctx context.Context) error {
}, &deleteCollectionMetaStep{
baseStep: baseStep{core: t.core},
collectionID: collID,
ts: ts,
// When we undo createCollectionTask, this ts may be less than the ts when unwatch channels.
ts: ts,
})
// serve for this case: watching channels succeed in datacoord but failed due to network failure.
undoTask.AddStep(&nullStep{}, &unwatchChannelsStep{
baseStep: baseStep{core: t.core},
collectionID: collID,
channels: t.channels,
})
undoTask.AddStep(&watchChannelsStep{
baseStep: baseStep{core: t.core},
Expand All @@ -278,11 +285,7 @@ func (t *createCollectionTask) Execute(ctx context.Context) error {
vChannels: t.channels.virtualChannels,
startPositions: toKeyDataPairs(startPositions),
},
}, &unwatchChannelsStep{
baseStep: baseStep{core: t.core},
collectionID: collID,
channels: t.channels,
})
}, &nullStep{})
undoTask.AddStep(&changeCollectionStateStep{
baseStep: baseStep{core: t.core},
collectionID: collID,
Expand Down
19 changes: 16 additions & 3 deletions internal/rootcoord/create_collection_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ import (
"testing"
"time"

mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks"

"github.com/stretchr/testify/mock"

"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/api/commonpb"
"github.com/milvus-io/milvus/api/milvuspb"
Expand Down Expand Up @@ -446,17 +450,26 @@ func Test_createCollectionTask_Execute(t *testing.T) {
broker.WatchChannelsFunc = func(ctx context.Context, info *watchInfo) error {
return nil
}

unwatchChannelsCalled := false
unwatchChannelsChan := make(chan struct{}, 1)
broker.UnwatchChannelsFunc = func(ctx context.Context, info *watchInfo) error {
gc := mockrootcoord.NewGarbageCollector(t)
gc.On("GcCollectionData",
mock.Anything, // context.Context
mock.Anything, // *model.Collection
).Return(func(ctx context.Context, collection *model.Collection) (ddlTs Timestamp) {
for _, pchan := range pchans {
ticker.syncedTtHistogram.update(pchan, 101)
}
unwatchChannelsCalled = true
unwatchChannelsChan <- struct{}{}
return nil
}
return 100
}, nil)

core := newTestCore(withValidIDAllocator(),
withMeta(meta),
withTtSynchronizer(ticker),
withGarbageCollector(gc),
withBroker(broker))

schema := &schemapb.CollectionSchema{
Expand Down
12 changes: 11 additions & 1 deletion internal/rootcoord/garbage_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/milvus-io/milvus/internal/metastore/model"
)

//go:generate mockery --name=GarbageCollector --outpkg=mockrootcoord
type GarbageCollector interface {
ReDropCollection(collMeta *model.Collection, ts Timestamp)
RemoveCreatingCollection(collMeta *model.Collection)
Expand Down Expand Up @@ -62,7 +63,11 @@ func (c *bgGarbageCollector) ReDropCollection(collMeta *model.Collection, ts Tim
}

func (c *bgGarbageCollector) RemoveCreatingCollection(collMeta *model.Collection) {
// TODO: remove this after data gc can be notified by rpc.
c.s.chanTimeTick.addDmlChannels(collMeta.PhysicalChannelNames...)

redo := newBaseRedoTask(c.s.stepExecutor)

redo.AddAsyncStep(&unwatchChannelsStep{
baseStep: baseStep{core: c.s},
collectionID: collMeta.CollectionID,
Expand All @@ -71,10 +76,15 @@ func (c *bgGarbageCollector) RemoveCreatingCollection(collMeta *model.Collection
physicalChannels: collMeta.PhysicalChannelNames,
},
})
redo.AddAsyncStep(&removeDmlChannelsStep{
baseStep: baseStep{core: c.s},
pChannels: collMeta.PhysicalChannelNames,
})
redo.AddAsyncStep(&deleteCollectionMetaStep{
baseStep: baseStep{core: c.s},
collectionID: collMeta.CollectionID,
ts: collMeta.CreateTime,
// When we undo createCollectionTask, this ts may be less than the ts when unwatch channels.
ts: collMeta.CreateTime,
})
// err is ignored since no sync steps will be executed.
_ = redo.Execute(context.Background())
Expand Down
104 changes: 76 additions & 28 deletions internal/rootcoord/garbage_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@ package rootcoord
import (
"context"
"errors"
"fmt"
"testing"

mocktso "github.com/milvus-io/milvus/internal/tso/mocks"
"github.com/stretchr/testify/mock"

"github.com/stretchr/testify/assert"

"github.com/milvus-io/milvus/internal/metastore/model"
Expand Down Expand Up @@ -164,45 +168,87 @@ func TestGarbageCollectorCtx_ReDropCollection(t *testing.T) {

func TestGarbageCollectorCtx_RemoveCreatingCollection(t *testing.T) {
t.Run("failed to UnwatchChannels", func(t *testing.T) {
broker := newMockBroker()
broker.UnwatchChannelsFunc = func(ctx context.Context, info *watchInfo) error {
return errors.New("error mock UnwatchChannels")
defer cleanTestEnv()

shardNum := 2

ticker := newRocksMqTtSynchronizer()
pchans := ticker.getDmlChannelNames(shardNum)

tsoAllocator := mocktso.NewAllocator(t)
tsoAllocator.
On("GenerateTSO", mock.AnythingOfType("uint32")).
Return(Timestamp(0), errors.New("error mock GenerateTSO"))

executed := make(chan struct{}, 1)
executor := newMockStepExecutor()
executor.AddStepsFunc = func(s *stepStack) {
s.Execute(context.Background())
executed <- struct{}{}
}
core := newTestCore(withBroker(broker))

core := newTestCore(withTtSynchronizer(ticker), withTsoAllocator(tsoAllocator), withStepExecutor(executor))
gc := newBgGarbageCollector(core)
core.ddlTsLockManager = newDdlTsLockManagerV2(tsoAllocator)
core.garbageCollector = gc
gc.RemoveCreatingCollection(&model.Collection{})

gc.RemoveCreatingCollection(&model.Collection{PhysicalChannelNames: pchans})
<-executed
})

t.Run("failed to RemoveCollection", func(t *testing.T) {
broker := newMockBroker()
unwatchChannelsCalled := false
unwatchChannelsChan := make(chan struct{}, 1)
broker.UnwatchChannelsFunc = func(ctx context.Context, info *watchInfo) error {
unwatchChannelsCalled = true
unwatchChannelsChan <- struct{}{}
return nil
defer cleanTestEnv()

shardNum := 2

ticker := newRocksMqTtSynchronizer()
pchans := ticker.getDmlChannelNames(shardNum)

tsoAllocator := mocktso.NewAllocator(t)
tsoAllocator.
On("GenerateTSO", mock.AnythingOfType("uint32")).
Return(Timestamp(100), nil)

for _, pchan := range pchans {
ticker.syncedTtHistogram.update(pchan, 101)
}

meta := newMockMetaTable()
removeCollectionCalled := false
removeCollectionChan := make(chan struct{}, 1)
meta.RemoveCollectionFunc = func(ctx context.Context, collectionID UniqueID, ts Timestamp) error {
return errors.New("error mock RemoveCollection")
removeCollectionCalled = true
removeCollectionChan <- struct{}{}
return fmt.Errorf("error mock RemoveCollection")
}
core := newTestCore(withBroker(broker), withMeta(meta))

core := newTestCore(withTtSynchronizer(ticker), withMeta(meta), withTsoAllocator(tsoAllocator))
gc := newBgGarbageCollector(core)
gc.RemoveCreatingCollection(&model.Collection{})
<-unwatchChannelsChan
assert.True(t, unwatchChannelsCalled)
core.ddlTsLockManager = newDdlTsLockManagerV2(tsoAllocator)
core.garbageCollector = gc

gc.RemoveCreatingCollection(&model.Collection{PhysicalChannelNames: pchans})
<-removeCollectionChan
assert.True(t, removeCollectionCalled) // though it fail.
})

t.Run("normal case", func(t *testing.T) {
broker := newMockBroker()
unwatchChannelsCalled := false
unwatchChannelsChan := make(chan struct{}, 1)
broker.UnwatchChannelsFunc = func(ctx context.Context, info *watchInfo) error {
unwatchChannelsCalled = true
unwatchChannelsChan <- struct{}{}
return nil
defer cleanTestEnv()

shardNum := 2

ticker := newRocksMqTtSynchronizer()
pchans := ticker.getDmlChannelNames(shardNum)

tsoAllocator := mocktso.NewAllocator(t)
tsoAllocator.
On("GenerateTSO", mock.AnythingOfType("uint32")).
Return(Timestamp(100), nil)

for _, pchan := range pchans {
ticker.syncedTtHistogram.update(pchan, 101)
}

meta := newMockMetaTable()
removeCollectionCalled := false
removeCollectionChan := make(chan struct{}, 1)
Expand All @@ -211,11 +257,13 @@ func TestGarbageCollectorCtx_RemoveCreatingCollection(t *testing.T) {
removeCollectionChan <- struct{}{}
return nil
}
core := newTestCore(withBroker(broker), withMeta(meta))

core := newTestCore(withTtSynchronizer(ticker), withMeta(meta), withTsoAllocator(tsoAllocator))
gc := newBgGarbageCollector(core)
gc.RemoveCreatingCollection(&model.Collection{})
<-unwatchChannelsChan
assert.True(t, unwatchChannelsCalled)
core.ddlTsLockManager = newDdlTsLockManagerV2(tsoAllocator)
core.garbageCollector = gc

gc.RemoveCreatingCollection(&model.Collection{PhysicalChannelNames: pchans})
<-removeCollectionChan
assert.True(t, removeCollectionCalled)
})
Expand Down
87 changes: 87 additions & 0 deletions internal/rootcoord/mocks/GarbageCollector.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions internal/rootcoord/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,11 @@ type unwatchChannelsStep struct {
}

func (s *unwatchChannelsStep) Execute(ctx context.Context) ([]nestedStep, error) {
err := s.core.broker.UnwatchChannels(ctx, &watchInfo{collectionID: s.collectionID, vChannels: s.channels.virtualChannels})
return nil, err
unwatchByDropMsg := &deleteCollectionDataStep{
baseStep: baseStep{core: s.core},
coll: &model.Collection{CollectionID: s.collectionID, PhysicalChannelNames: s.channels.physicalChannels},
}
return unwatchByDropMsg.Execute(ctx)
}

func (s *unwatchChannelsStep) Desc() string {
Expand Down
1 change: 1 addition & 0 deletions internal/tso/global_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"go.uber.org/zap"
)

//go:generate mockery --name=Allocator --outpkg=mocktso
// Allocator is a Timestamp Oracle allocator.
type Allocator interface {
// Initialize is used to initialize a TSO allocator.
Expand Down
2 changes: 2 additions & 0 deletions internal/tso/mock_global_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"time"
)

// TODO(longjiquan): replace this by mockery.

type MockAllocator struct {
Allocator
InitializeF func() error
Expand Down
Loading

0 comments on commit 22477d4

Please sign in to comment.