diff --git a/Makefile b/Makefile index 6b3ed0746a722..afdb43ca96f42 100644 --- a/Makefile +++ b/Makefile @@ -259,3 +259,9 @@ rpm: install @cp -r configs ~/rpmbuild/BUILD/ @cp -r build/rpm/services ~/rpmbuild/BUILD/ @QA_RPATHS="$$[ 0x001|0x0002|0x0020 ]" rpmbuild -ba ./build/rpm/milvus.spec + +mock-datanode: + mockery --name=DataNode --dir=$(PWD)/internal/types --output=$(PWD)/internal/mocks --filename=mock_datanode.go --with-expecter + +mock-tnx-kv: + mockery --name=TxnKV --dir=$(PWD)/internal/kv --output=$(PWD)/internal/kv/mocks --filename=TxnKV.go --with-expecter diff --git a/internal/kv/mocks/TxnKV.go b/internal/kv/mocks/TxnKV.go index 9197f72bcd53d..f9a958f55b448 100644 --- a/internal/kv/mocks/TxnKV.go +++ b/internal/kv/mocks/TxnKV.go @@ -9,11 +9,41 @@ type TxnKV struct { mock.Mock } +type TxnKV_Expecter struct { + mock *mock.Mock +} + +func (_m *TxnKV) EXPECT() *TxnKV_Expecter { + return &TxnKV_Expecter{mock: &_m.Mock} +} + // Close provides a mock function with given fields: func (_m *TxnKV) Close() { _m.Called() } +// TxnKV_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close' +type TxnKV_Close_Call struct { + *mock.Call +} + +// Close is a helper method to define mock.On call +func (_e *TxnKV_Expecter) Close() *TxnKV_Close_Call { + return &TxnKV_Close_Call{Call: _e.mock.On("Close")} +} + +func (_c *TxnKV_Close_Call) Run(run func()) *TxnKV_Close_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *TxnKV_Close_Call) Return() *TxnKV_Close_Call { + _c.Call.Return() + return _c +} + // Load provides a mock function with given fields: key func (_m *TxnKV) Load(key string) (string, error) { ret := _m.Called(key) @@ -35,6 +65,29 @@ func (_m *TxnKV) Load(key string) (string, error) { return r0, r1 } +// TxnKV_Load_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Load' +type TxnKV_Load_Call struct { + *mock.Call +} + +// Load is a helper method to define mock.On call +// - key string +func (_e *TxnKV_Expecter) Load(key interface{}) *TxnKV_Load_Call { + return &TxnKV_Load_Call{Call: _e.mock.On("Load", key)} +} + +func (_c *TxnKV_Load_Call) Run(run func(key string)) *TxnKV_Load_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string)) + }) + return _c +} + +func (_c *TxnKV_Load_Call) Return(_a0 string, _a1 error) *TxnKV_Load_Call { + _c.Call.Return(_a0, _a1) + return _c +} + // LoadWithPrefix provides a mock function with given fields: key func (_m *TxnKV) LoadWithPrefix(key string) ([]string, []string, error) { ret := _m.Called(key) @@ -67,6 +120,29 @@ func (_m *TxnKV) LoadWithPrefix(key string) ([]string, []string, error) { return r0, r1, r2 } +// TxnKV_LoadWithPrefix_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'LoadWithPrefix' +type TxnKV_LoadWithPrefix_Call struct { + *mock.Call +} + +// LoadWithPrefix is a helper method to define mock.On call +// - key string +func (_e *TxnKV_Expecter) LoadWithPrefix(key interface{}) *TxnKV_LoadWithPrefix_Call { + return &TxnKV_LoadWithPrefix_Call{Call: _e.mock.On("LoadWithPrefix", key)} +} + +func (_c *TxnKV_LoadWithPrefix_Call) Run(run func(key string)) *TxnKV_LoadWithPrefix_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string)) + }) + return _c +} + +func (_c *TxnKV_LoadWithPrefix_Call) Return(_a0 []string, _a1 []string, _a2 error) *TxnKV_LoadWithPrefix_Call { + _c.Call.Return(_a0, _a1, _a2) + return _c +} + // MultiLoad provides a mock function with given fields: keys func (_m *TxnKV) MultiLoad(keys []string) ([]string, error) { ret := _m.Called(keys) @@ -90,6 +166,29 @@ func (_m *TxnKV) MultiLoad(keys []string) ([]string, error) { return r0, r1 } +// TxnKV_MultiLoad_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MultiLoad' +type TxnKV_MultiLoad_Call struct { + *mock.Call +} + +// MultiLoad is a helper method to define mock.On call +// - keys []string +func (_e *TxnKV_Expecter) MultiLoad(keys interface{}) *TxnKV_MultiLoad_Call { + return &TxnKV_MultiLoad_Call{Call: _e.mock.On("MultiLoad", keys)} +} + +func (_c *TxnKV_MultiLoad_Call) Run(run func(keys []string)) *TxnKV_MultiLoad_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].([]string)) + }) + return _c +} + +func (_c *TxnKV_MultiLoad_Call) Return(_a0 []string, _a1 error) *TxnKV_MultiLoad_Call { + _c.Call.Return(_a0, _a1) + return _c +} + // MultiRemove provides a mock function with given fields: keys func (_m *TxnKV) MultiRemove(keys []string) error { ret := _m.Called(keys) @@ -104,6 +203,29 @@ func (_m *TxnKV) MultiRemove(keys []string) error { return r0 } +// TxnKV_MultiRemove_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MultiRemove' +type TxnKV_MultiRemove_Call struct { + *mock.Call +} + +// MultiRemove is a helper method to define mock.On call +// - keys []string +func (_e *TxnKV_Expecter) MultiRemove(keys interface{}) *TxnKV_MultiRemove_Call { + return &TxnKV_MultiRemove_Call{Call: _e.mock.On("MultiRemove", keys)} +} + +func (_c *TxnKV_MultiRemove_Call) Run(run func(keys []string)) *TxnKV_MultiRemove_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].([]string)) + }) + return _c +} + +func (_c *TxnKV_MultiRemove_Call) Return(_a0 error) *TxnKV_MultiRemove_Call { + _c.Call.Return(_a0) + return _c +} + // MultiRemoveWithPrefix provides a mock function with given fields: keys func (_m *TxnKV) MultiRemoveWithPrefix(keys []string) error { ret := _m.Called(keys) @@ -118,6 +240,29 @@ func (_m *TxnKV) MultiRemoveWithPrefix(keys []string) error { return r0 } +// TxnKV_MultiRemoveWithPrefix_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MultiRemoveWithPrefix' +type TxnKV_MultiRemoveWithPrefix_Call struct { + *mock.Call +} + +// MultiRemoveWithPrefix is a helper method to define mock.On call +// - keys []string +func (_e *TxnKV_Expecter) MultiRemoveWithPrefix(keys interface{}) *TxnKV_MultiRemoveWithPrefix_Call { + return &TxnKV_MultiRemoveWithPrefix_Call{Call: _e.mock.On("MultiRemoveWithPrefix", keys)} +} + +func (_c *TxnKV_MultiRemoveWithPrefix_Call) Run(run func(keys []string)) *TxnKV_MultiRemoveWithPrefix_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].([]string)) + }) + return _c +} + +func (_c *TxnKV_MultiRemoveWithPrefix_Call) Return(_a0 error) *TxnKV_MultiRemoveWithPrefix_Call { + _c.Call.Return(_a0) + return _c +} + // MultiSave provides a mock function with given fields: kvs func (_m *TxnKV) MultiSave(kvs map[string]string) error { ret := _m.Called(kvs) @@ -132,6 +277,29 @@ func (_m *TxnKV) MultiSave(kvs map[string]string) error { return r0 } +// TxnKV_MultiSave_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MultiSave' +type TxnKV_MultiSave_Call struct { + *mock.Call +} + +// MultiSave is a helper method to define mock.On call +// - kvs map[string]string +func (_e *TxnKV_Expecter) MultiSave(kvs interface{}) *TxnKV_MultiSave_Call { + return &TxnKV_MultiSave_Call{Call: _e.mock.On("MultiSave", kvs)} +} + +func (_c *TxnKV_MultiSave_Call) Run(run func(kvs map[string]string)) *TxnKV_MultiSave_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(map[string]string)) + }) + return _c +} + +func (_c *TxnKV_MultiSave_Call) Return(_a0 error) *TxnKV_MultiSave_Call { + _c.Call.Return(_a0) + return _c +} + // MultiSaveAndRemove provides a mock function with given fields: saves, removals func (_m *TxnKV) MultiSaveAndRemove(saves map[string]string, removals []string) error { ret := _m.Called(saves, removals) @@ -146,6 +314,30 @@ func (_m *TxnKV) MultiSaveAndRemove(saves map[string]string, removals []string) return r0 } +// TxnKV_MultiSaveAndRemove_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MultiSaveAndRemove' +type TxnKV_MultiSaveAndRemove_Call struct { + *mock.Call +} + +// MultiSaveAndRemove is a helper method to define mock.On call +// - saves map[string]string +// - removals []string +func (_e *TxnKV_Expecter) MultiSaveAndRemove(saves interface{}, removals interface{}) *TxnKV_MultiSaveAndRemove_Call { + return &TxnKV_MultiSaveAndRemove_Call{Call: _e.mock.On("MultiSaveAndRemove", saves, removals)} +} + +func (_c *TxnKV_MultiSaveAndRemove_Call) Run(run func(saves map[string]string, removals []string)) *TxnKV_MultiSaveAndRemove_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(map[string]string), args[1].([]string)) + }) + return _c +} + +func (_c *TxnKV_MultiSaveAndRemove_Call) Return(_a0 error) *TxnKV_MultiSaveAndRemove_Call { + _c.Call.Return(_a0) + return _c +} + // MultiSaveAndRemoveWithPrefix provides a mock function with given fields: saves, removals func (_m *TxnKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error { ret := _m.Called(saves, removals) @@ -160,6 +352,30 @@ func (_m *TxnKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals return r0 } +// TxnKV_MultiSaveAndRemoveWithPrefix_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MultiSaveAndRemoveWithPrefix' +type TxnKV_MultiSaveAndRemoveWithPrefix_Call struct { + *mock.Call +} + +// MultiSaveAndRemoveWithPrefix is a helper method to define mock.On call +// - saves map[string]string +// - removals []string +func (_e *TxnKV_Expecter) MultiSaveAndRemoveWithPrefix(saves interface{}, removals interface{}) *TxnKV_MultiSaveAndRemoveWithPrefix_Call { + return &TxnKV_MultiSaveAndRemoveWithPrefix_Call{Call: _e.mock.On("MultiSaveAndRemoveWithPrefix", saves, removals)} +} + +func (_c *TxnKV_MultiSaveAndRemoveWithPrefix_Call) Run(run func(saves map[string]string, removals []string)) *TxnKV_MultiSaveAndRemoveWithPrefix_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(map[string]string), args[1].([]string)) + }) + return _c +} + +func (_c *TxnKV_MultiSaveAndRemoveWithPrefix_Call) Return(_a0 error) *TxnKV_MultiSaveAndRemoveWithPrefix_Call { + _c.Call.Return(_a0) + return _c +} + // Remove provides a mock function with given fields: key func (_m *TxnKV) Remove(key string) error { ret := _m.Called(key) @@ -174,6 +390,29 @@ func (_m *TxnKV) Remove(key string) error { return r0 } +// TxnKV_Remove_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Remove' +type TxnKV_Remove_Call struct { + *mock.Call +} + +// Remove is a helper method to define mock.On call +// - key string +func (_e *TxnKV_Expecter) Remove(key interface{}) *TxnKV_Remove_Call { + return &TxnKV_Remove_Call{Call: _e.mock.On("Remove", key)} +} + +func (_c *TxnKV_Remove_Call) Run(run func(key string)) *TxnKV_Remove_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string)) + }) + return _c +} + +func (_c *TxnKV_Remove_Call) Return(_a0 error) *TxnKV_Remove_Call { + _c.Call.Return(_a0) + return _c +} + // RemoveWithPrefix provides a mock function with given fields: key func (_m *TxnKV) RemoveWithPrefix(key string) error { ret := _m.Called(key) @@ -188,6 +427,29 @@ func (_m *TxnKV) RemoveWithPrefix(key string) error { return r0 } +// TxnKV_RemoveWithPrefix_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RemoveWithPrefix' +type TxnKV_RemoveWithPrefix_Call struct { + *mock.Call +} + +// RemoveWithPrefix is a helper method to define mock.On call +// - key string +func (_e *TxnKV_Expecter) RemoveWithPrefix(key interface{}) *TxnKV_RemoveWithPrefix_Call { + return &TxnKV_RemoveWithPrefix_Call{Call: _e.mock.On("RemoveWithPrefix", key)} +} + +func (_c *TxnKV_RemoveWithPrefix_Call) Run(run func(key string)) *TxnKV_RemoveWithPrefix_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string)) + }) + return _c +} + +func (_c *TxnKV_RemoveWithPrefix_Call) Return(_a0 error) *TxnKV_RemoveWithPrefix_Call { + _c.Call.Return(_a0) + return _c +} + // Save provides a mock function with given fields: key, value func (_m *TxnKV) Save(key string, value string) error { ret := _m.Called(key, value) @@ -202,6 +464,30 @@ func (_m *TxnKV) Save(key string, value string) error { return r0 } +// TxnKV_Save_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Save' +type TxnKV_Save_Call struct { + *mock.Call +} + +// Save is a helper method to define mock.On call +// - key string +// - value string +func (_e *TxnKV_Expecter) Save(key interface{}, value interface{}) *TxnKV_Save_Call { + return &TxnKV_Save_Call{Call: _e.mock.On("Save", key, value)} +} + +func (_c *TxnKV_Save_Call) Run(run func(key string, value string)) *TxnKV_Save_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string), args[1].(string)) + }) + return _c +} + +func (_c *TxnKV_Save_Call) Return(_a0 error) *TxnKV_Save_Call { + _c.Call.Return(_a0) + return _c +} + type mockConstructorTestingTNewTxnKV interface { mock.TestingT Cleanup(func()) diff --git a/internal/metastore/catalog.go b/internal/metastore/catalog.go index 97d5bb31d8a07..cbb9a0a1b760e 100644 --- a/internal/metastore/catalog.go +++ b/internal/metastore/catalog.go @@ -79,6 +79,8 @@ type DataCoordCatalog interface { MarkChannelDeleted(ctx context.Context, channel string) error IsChannelDropped(ctx context.Context, channel string) bool DropChannel(ctx context.Context, channel string) error + + RevertAlterSegmentsAndAddNewSegment(ctx context.Context, segments []*datapb.SegmentInfo, removalSegment *datapb.SegmentInfo) error } type IndexCoordCatalog interface { diff --git a/internal/metastore/kv/datacoord/kv_catalog.go b/internal/metastore/kv/datacoord/kv_catalog.go index a07e3783b549a..890f4b3a043e7 100644 --- a/internal/metastore/kv/datacoord/kv_catalog.go +++ b/internal/metastore/kv/datacoord/kv_catalog.go @@ -172,6 +172,47 @@ func (kc *Catalog) AlterSegmentsAndAddNewSegment(ctx context.Context, segments [ return nil } +// RevertAlterSegmentsAndAddNewSegment reverts the metastore operation of AtlerSegmentsAndAddNewSegment +func (kc *Catalog) RevertAlterSegmentsAndAddNewSegment(ctx context.Context, oldSegments []*datapb.SegmentInfo, removeSegment *datapb.SegmentInfo) error { + var ( + data = make(map[string]string) + removals []string + ) + + for _, s := range oldSegments { + k, v, err := buildSegmentKeyValuePair(s) + if err != nil { + return err + } + data[k] = v + } + + if removeSegment.NumOfRows > 0 { + // get all binlog keys + binlogKvs, err := buildBinlogKvPair(removeSegment) + if err != nil { + return err + } + binlogKeys := typeutil.GetMapKeys(binlogKvs) + removals = append(removals, binlogKeys...) + + // get segment key + k, _, err := buildSegmentKeyValuePair(removeSegment) + if err != nil { + return err + } + removals = append(removals, k) + } + + err := kc.Txn.MultiSaveAndRemove(data, removals) + if err != nil { + log.Warn("batch save and remove segments failed", zap.Error(err)) + return err + } + + return nil +} + func (kc *Catalog) SaveDroppedSegmentsInBatch(ctx context.Context, segments []*datapb.SegmentInfo) error { kvs := make(map[string]string) batchIDs := make([]int64, 0, maxOperationsPerTxn) diff --git a/internal/metastore/kv/datacoord/kv_catalog_test.go b/internal/metastore/kv/datacoord/kv_catalog_test.go index 224e77a0b29bc..16f7447e98225 100644 --- a/internal/metastore/kv/datacoord/kv_catalog_test.go +++ b/internal/metastore/kv/datacoord/kv_catalog_test.go @@ -6,25 +6,13 @@ import ( "math/rand" "testing" - "github.com/milvus-io/milvus/internal/kv" + "github.com/milvus-io/milvus/internal/kv/mocks" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" ) -type MockedTxnKV struct { - kv.TxnKV - multiSave func(kvs map[string]string) error - save func(key, value string) error -} - -func (mc *MockedTxnKV) MultiSave(kvs map[string]string) error { - return mc.multiSave(kvs) -} - -func (mc *MockedTxnKV) Save(key, value string) error { - return mc.save(key, value) -} - var ( segments = []*datapb.SegmentInfo{ { @@ -39,22 +27,103 @@ var ( } ) -func Test_AlterSegmentsAndAddNewSegment_SaveError(t *testing.T) { - txn := &MockedTxnKV{} - txn.multiSave = func(kvs map[string]string) error { - return errors.New("error") - } +func TestCatalog_AlterSegmentsAndAddNewSegment(t *testing.T) { + t.Run("save error", func(t *testing.T) { + txn := &mocks.TxnKV{} + txn.EXPECT().MultiSave(mock.Anything).Return(errors.New("mock error")) - catalog := &Catalog{txn} - err := catalog.AlterSegmentsAndAddNewSegment(context.TODO(), segments, newSegment) - assert.Error(t, err) + catalog := &Catalog{txn} + err := catalog.AlterSegmentsAndAddNewSegment(context.TODO(), segments, newSegment) + assert.Error(t, err) + }) + + t.Run("numRow>0", func(t *testing.T) { + txn := &mocks.TxnKV{} + txn.EXPECT().MultiSave(mock.Anything).Return(nil) + + toAlter := []*datapb.SegmentInfo{ + { + CollectionID: 100, + PartitionID: 10, + ID: 1, + }, + } + + newSeg := &datapb.SegmentInfo{ + Binlogs: []*datapb.FieldBinlog{ + { + FieldID: 101, + Binlogs: []*datapb.Binlog{}, + }, + }, + Deltalogs: []*datapb.FieldBinlog{ + { + FieldID: 101, + Binlogs: []*datapb.Binlog{}, + }, + }, + CollectionID: 100, + PartitionID: 10, + ID: 2, + NumOfRows: 15, + } + + catalog := &Catalog{txn} + err := catalog.AlterSegmentsAndAddNewSegment(context.Background(), toAlter, newSeg) + assert.NoError(t, err) + }) +} + +func TestCatalog_RevertAlterSegmentsAndAddNewSegment(t *testing.T) { + t.Run("save error", func(t *testing.T) { + txn := &mocks.TxnKV{} + txn.EXPECT().MultiSaveAndRemove(mock.Anything, mock.Anything).Return(errors.New("mock error")) + + catalog := &Catalog{txn} + err := catalog.RevertAlterSegmentsAndAddNewSegment(context.TODO(), segments, newSegment) + assert.Error(t, err) + }) + + t.Run("numRow>0", func(t *testing.T) { + txn := &mocks.TxnKV{} + txn.EXPECT().MultiSaveAndRemove(mock.Anything, mock.Anything).Return(nil) + + toAlter := []*datapb.SegmentInfo{ + { + CollectionID: 100, + PartitionID: 10, + ID: 1, + }, + } + + newSeg := &datapb.SegmentInfo{ + Binlogs: []*datapb.FieldBinlog{ + { + FieldID: 101, + Binlogs: []*datapb.Binlog{}, + }, + }, + Deltalogs: []*datapb.FieldBinlog{ + { + FieldID: 101, + Binlogs: []*datapb.Binlog{}, + }, + }, + CollectionID: 100, + PartitionID: 10, + ID: 2, + NumOfRows: 15, + } + + catalog := &Catalog{txn} + err := catalog.RevertAlterSegmentsAndAddNewSegment(context.Background(), toAlter, newSeg) + assert.NoError(t, err) + }) } func Test_SaveDroppedSegmentsInBatch_SaveError(t *testing.T) { - txn := &MockedTxnKV{} - txn.multiSave = func(kvs map[string]string) error { - return errors.New("error") - } + txn := &mocks.TxnKV{} + txn.EXPECT().MultiSave(mock.Anything).Return(errors.New("mock error")) catalog := &Catalog{txn} segments := []*datapb.SegmentInfo{ @@ -68,14 +137,18 @@ func Test_SaveDroppedSegmentsInBatch_SaveError(t *testing.T) { } func Test_SaveDroppedSegmentsInBatch_MultiSave(t *testing.T) { - txn := &MockedTxnKV{} - count := 0 - kvSize := 0 - txn.multiSave = func(kvs map[string]string) error { - count++ - kvSize += len(kvs) - return nil - } + var ( + count = 0 + kvSize = 0 + ) + txn := &mocks.TxnKV{} + txn.EXPECT(). + MultiSave(mock.Anything). + Run(func(kvs map[string]string) { + count++ + kvSize += len(kvs) + }). + Return(nil) catalog := &Catalog{txn} @@ -149,10 +222,8 @@ func randomString(len int) string { } func Test_MarkChannelDeleted_SaveError(t *testing.T) { - txn := &MockedTxnKV{} - txn.save = func(key, value string) error { - return errors.New("error") - } + txn := &mocks.TxnKV{} + txn.EXPECT().Save(mock.Anything, mock.Anything).Return(errors.New("mock error")) catalog := &Catalog{txn} err := catalog.MarkChannelDeleted(context.TODO(), "test_channel_1")