From 7261128df031055d9a3050065fb1b4b7f991a05c Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Tue, 21 Jan 2025 11:15:04 +0800 Subject: [PATCH] enhance: Remove not inuse binlog iterators (#39243) 1. Remove datanode/iterators/binlog_iterator 2. Remove storage/binlog_iterator/MergeIterator See also: #39242 --------- Signed-off-by: yangxuan --- .gitignore | 1 + .../datanode/iterators/binlog_iterator.go | 103 ------ .../iterators/binlog_iterator_test.go | 338 ------------------ .../iterators/deltalog_iterator_test.go | 2 +- internal/storage/binlog_iterator.go | 89 ----- internal/storage/binlog_iterator_test.go | 77 ---- 6 files changed, 2 insertions(+), 608 deletions(-) delete mode 100644 internal/datanode/iterators/binlog_iterator.go delete mode 100644 internal/datanode/iterators/binlog_iterator_test.go diff --git a/.gitignore b/.gitignore index 041d808936c0d..2e3991c4b29a6 100644 --- a/.gitignore +++ b/.gitignore @@ -101,6 +101,7 @@ cwrapper_rocksdb_build/ **/data/* internal/proto/**/*.pb.go +pkg/streaming/**/*.pb.go internal/core/src/pb/*.pb.h internal/core/src/pb/*.pb.cc **/legacypb/*.pb.go diff --git a/internal/datanode/iterators/binlog_iterator.go b/internal/datanode/iterators/binlog_iterator.go deleted file mode 100644 index 8f3d371d5439c..0000000000000 --- a/internal/datanode/iterators/binlog_iterator.go +++ /dev/null @@ -1,103 +0,0 @@ -package iterator - -import ( - "sync" - - "go.uber.org/atomic" - - "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" - "github.com/milvus-io/milvus/internal/storage" - "github.com/milvus-io/milvus/pkg/common" - "github.com/milvus-io/milvus/pkg/util/typeutil" -) - -type BinlogIterator struct { - disposed atomic.Bool - disposedCh chan struct{} - disposedOnce sync.Once - - data *storage.InsertData - label *Label - pkFieldID int64 - pkType schemapb.DataType - pos int -} - -var _ Iterator = (*BinlogIterator)(nil) - -// NewInsertBinlogIterator creates a new iterator -func NewInsertBinlogIterator(v [][]byte, pkFieldID typeutil.UniqueID, pkType schemapb.DataType, label *Label) (*BinlogIterator, error) { - blobs := make([]*storage.Blob, len(v)) - for i := range blobs { - blobs[i] = &storage.Blob{Value: v[i]} - } - - reader := storage.NewInsertCodec() - _, _, iData, err := reader.Deserialize(blobs) - if err != nil { - return nil, err - } - - return &BinlogIterator{ - disposedCh: make(chan struct{}), - data: iData, - pkFieldID: pkFieldID, - pkType: pkType, - label: label, - }, nil -} - -// HasNext returns true if the iterator have unread record -func (i *BinlogIterator) HasNext() bool { - return !i.isDisposed() && i.hasNext() -} - -func (i *BinlogIterator) Next() (*LabeledRowData, error) { - if i.isDisposed() { - return nil, ErrDisposed - } - - if !i.hasNext() { - return nil, ErrNoMoreRecord - } - - fields := make(map[int64]interface{}) - for fieldID, fieldData := range i.data.Data { - fields[fieldID] = fieldData.GetRow(i.pos) - } - - pk, err := storage.GenPrimaryKeyByRawData(i.data.Data[i.pkFieldID].GetRow(i.pos), i.pkType) - if err != nil { - return nil, err - } - - row := &InsertRow{ - ID: i.data.Data[common.RowIDField].GetRow(i.pos).(int64), - Timestamp: uint64(i.data.Data[common.TimeStampField].GetRow(i.pos).(int64)), - Pk: pk, - Value: fields, - } - i.pos++ - return NewLabeledRowData(row, i.label), nil -} - -// Dispose disposes the iterator -func (i *BinlogIterator) Dispose() { - i.disposed.CompareAndSwap(false, true) - i.disposedOnce.Do(func() { - close(i.disposedCh) - }) -} - -func (i *BinlogIterator) hasNext() bool { - return i.pos < i.data.GetRowNum() -} - -func (i *BinlogIterator) isDisposed() bool { - return i.disposed.Load() -} - -// Disposed wait forever for the iterator to dispose -func (i *BinlogIterator) WaitForDisposed() { - <-i.disposedCh -} diff --git a/internal/datanode/iterators/binlog_iterator_test.go b/internal/datanode/iterators/binlog_iterator_test.go deleted file mode 100644 index da71278972d65..0000000000000 --- a/internal/datanode/iterators/binlog_iterator_test.go +++ /dev/null @@ -1,338 +0,0 @@ -package iterator - -import ( - "testing" - - "github.com/stretchr/testify/suite" - - "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" - "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" - "github.com/milvus-io/milvus/internal/storage" - "github.com/milvus-io/milvus/pkg/common" - "github.com/milvus-io/milvus/pkg/proto/etcdpb" -) - -func TestInsertBinlogIteratorSuite(t *testing.T) { - suite.Run(t, new(InsertBinlogIteratorSuite)) -} - -const ( - CollectionID = 10000 - PartitionID = 10001 - SegmentID = 10002 - RowIDField = 0 - TimestampField = 1 - BoolField = 100 - Int8Field = 101 - Int16Field = 102 - Int32Field = 103 - Int64Field = 104 - FloatField = 105 - DoubleField = 106 - StringField = 107 - BinaryVectorField = 108 - FloatVectorField = 109 - ArrayField = 110 - JSONField = 111 - Float16VectorField = 112 - BFloat16VectorField = 113 -) - -type InsertBinlogIteratorSuite struct { - suite.Suite - - i *BinlogIterator -} - -func (s *InsertBinlogIteratorSuite) TestBinlogIterator() { - insertData, meta := genTestInsertData() - writer := storage.NewInsertCodecWithSchema(meta) - blobs, err := writer.Serialize(PartitionID, SegmentID, insertData) - s.Require().NoError(err) - - values := [][]byte{} - for _, b := range blobs { - values = append(values, b.Value) - } - s.Run("invalid blobs", func() { - iter, err := NewInsertBinlogIterator([][]byte{}, Int64Field, schemapb.DataType_Int64, nil) - s.Error(err) - s.Nil(iter) - }) - - s.Run("invalid pk type", func() { - iter, err := NewInsertBinlogIterator(values, Int64Field, schemapb.DataType_Float, &Label{segmentID: 19530}) - s.NoError(err) - - _, err = iter.Next() - s.Error(err) - }) - - s.Run("normal", func() { - iter, err := NewInsertBinlogIterator(values, Int64Field, schemapb.DataType_Int64, &Label{segmentID: 19530}) - s.NoError(err) - - rows := []interface{}{} - var idx int = 0 // row number - - for iter.HasNext() { - labeled, err := iter.Next() - s.NoError(err) - s.Equal(int64(19530), labeled.GetSegmentID()) - - rows = append(rows, labeled.data) - - label := labeled.GetLabel() - s.NotNil(label) - s.EqualValues(19530, label.segmentID) - s.EqualValues(19530, labeled.GetSegmentID()) - - insertRow, ok := labeled.data.(*InsertRow) - s.True(ok) - - s.EqualValues(insertData.Data[TimestampField].GetRow(idx).(int64), labeled.GetTimestamp()) - s.Equal(insertData.Data[Int64Field].GetRow(idx).(int64), labeled.GetPk().GetValue().(int64)) - s.Equal(insertData.Data[RowIDField].GetRow(idx).(int64), insertRow.ID) - s.Equal(insertData.Data[BoolField].GetRow(idx).(bool), insertRow.Value[BoolField].(bool)) - s.Equal(insertData.Data[Int8Field].GetRow(idx).(int8), insertRow.Value[Int8Field].(int8)) - s.Equal(insertData.Data[Int16Field].GetRow(idx).(int16), insertRow.Value[Int16Field].(int16)) - s.Equal(insertData.Data[Int32Field].GetRow(idx).(int32), insertRow.Value[Int32Field].(int32)) - s.Equal(insertData.Data[Int64Field].GetRow(idx).(int64), insertRow.Value[Int64Field].(int64)) - s.Equal(insertData.Data[Int64Field].GetRow(idx).(int64), insertRow.Value[Int64Field].(int64)) - s.Equal(insertData.Data[FloatField].GetRow(idx).(float32), insertRow.Value[FloatField].(float32)) - s.Equal(insertData.Data[DoubleField].GetRow(idx).(float64), insertRow.Value[DoubleField].(float64)) - s.Equal(insertData.Data[StringField].GetRow(idx).(string), insertRow.Value[StringField].(string)) - s.Equal(insertData.Data[ArrayField].GetRow(idx).(*schemapb.ScalarField).GetIntData().Data, insertRow.Value[ArrayField].(*schemapb.ScalarField).GetIntData().Data) - s.Equal(insertData.Data[JSONField].GetRow(idx).([]byte), insertRow.Value[JSONField].([]byte)) - s.Equal(insertData.Data[BinaryVectorField].GetRow(idx).([]byte), insertRow.Value[BinaryVectorField].([]byte)) - s.Equal(insertData.Data[FloatVectorField].GetRow(idx).([]float32), insertRow.Value[FloatVectorField].([]float32)) - s.Equal(insertData.Data[Float16VectorField].GetRow(idx).([]byte), insertRow.Value[Float16VectorField].([]byte)) - s.Equal(insertData.Data[BFloat16VectorField].GetRow(idx).([]byte), insertRow.Value[BFloat16VectorField].([]byte)) - - idx++ - } - - s.Equal(2, len(rows)) - - _, err = iter.Next() - s.ErrorIs(err, ErrNoMoreRecord) - - iter.Dispose() - iter.WaitForDisposed() - - _, err = iter.Next() - s.ErrorIs(err, ErrDisposed) - }) -} - -func genTestInsertData() (*storage.InsertData, *etcdpb.CollectionMeta) { - meta := &etcdpb.CollectionMeta{ - ID: CollectionID, - CreateTime: 1, - SegmentIDs: []int64{SegmentID}, - PartitionTags: []string{"partition_0", "partition_1"}, - Schema: &schemapb.CollectionSchema{ - Name: "schema", - Description: "schema", - AutoID: true, - Fields: []*schemapb.FieldSchema{ - { - FieldID: RowIDField, - Name: "row_id", - IsPrimaryKey: false, - Description: "row_id", - DataType: schemapb.DataType_Int64, - }, - { - FieldID: TimestampField, - Name: "Timestamp", - IsPrimaryKey: false, - Description: "Timestamp", - DataType: schemapb.DataType_Int64, - }, - { - FieldID: BoolField, - Name: "field_bool", - IsPrimaryKey: false, - Description: "bool", - DataType: schemapb.DataType_Bool, - }, - { - FieldID: Int8Field, - Name: "field_int8", - IsPrimaryKey: false, - Description: "int8", - DataType: schemapb.DataType_Int8, - }, - { - FieldID: Int16Field, - Name: "field_int16", - IsPrimaryKey: false, - Description: "int16", - DataType: schemapb.DataType_Int16, - }, - { - FieldID: Int32Field, - Name: "field_int32", - IsPrimaryKey: false, - Description: "int32", - DataType: schemapb.DataType_Int32, - }, - { - FieldID: Int64Field, - Name: "field_int64", - IsPrimaryKey: true, - Description: "int64", - DataType: schemapb.DataType_Int64, - }, - { - FieldID: FloatField, - Name: "field_float", - IsPrimaryKey: false, - Description: "float", - DataType: schemapb.DataType_Float, - }, - { - FieldID: DoubleField, - Name: "field_double", - IsPrimaryKey: false, - Description: "double", - DataType: schemapb.DataType_Double, - }, - { - FieldID: StringField, - Name: "field_string", - IsPrimaryKey: false, - Description: "string", - DataType: schemapb.DataType_String, - }, - { - FieldID: ArrayField, - Name: "field_int32_array", - Description: "int32 array", - DataType: schemapb.DataType_Array, - ElementType: schemapb.DataType_Int32, - }, - { - FieldID: JSONField, - Name: "field_json", - Description: "json", - DataType: schemapb.DataType_JSON, - }, - { - FieldID: BinaryVectorField, - Name: "field_binary_vector", - IsPrimaryKey: false, - Description: "binary_vector", - DataType: schemapb.DataType_BinaryVector, - TypeParams: []*commonpb.KeyValuePair{ - {Key: common.DimKey, Value: "8"}, - }, - }, - { - FieldID: FloatVectorField, - Name: "field_float_vector", - IsPrimaryKey: false, - Description: "float_vector", - DataType: schemapb.DataType_FloatVector, - TypeParams: []*commonpb.KeyValuePair{ - {Key: common.DimKey, Value: "4"}, - }, - }, - { - FieldID: Float16VectorField, - Name: "field_float16_vector", - IsPrimaryKey: false, - Description: "float16_vector", - DataType: schemapb.DataType_Float16Vector, - TypeParams: []*commonpb.KeyValuePair{ - {Key: common.DimKey, Value: "4"}, - }, - }, - { - FieldID: BFloat16VectorField, - Name: "field_bfloat16_vector", - IsPrimaryKey: false, - Description: "bfloat16_vector", - DataType: schemapb.DataType_BFloat16Vector, - TypeParams: []*commonpb.KeyValuePair{ - {Key: common.DimKey, Value: "4"}, - }, - }, - }, - }, - } - insertData := storage.InsertData{ - Data: map[int64]storage.FieldData{ - RowIDField: &storage.Int64FieldData{ - Data: []int64{3, 4}, - }, - TimestampField: &storage.Int64FieldData{ - Data: []int64{3, 4}, - }, - BoolField: &storage.BoolFieldData{ - Data: []bool{true, false}, - }, - Int8Field: &storage.Int8FieldData{ - Data: []int8{3, 4}, - }, - Int16Field: &storage.Int16FieldData{ - Data: []int16{3, 4}, - }, - Int32Field: &storage.Int32FieldData{ - Data: []int32{3, 4}, - }, - Int64Field: &storage.Int64FieldData{ - Data: []int64{3, 4}, - }, - FloatField: &storage.FloatFieldData{ - Data: []float32{3, 4}, - }, - DoubleField: &storage.DoubleFieldData{ - Data: []float64{3, 4}, - }, - StringField: &storage.StringFieldData{ - Data: []string{"3", "4"}, - }, - BinaryVectorField: &storage.BinaryVectorFieldData{ - Data: []byte{0, 255}, - Dim: 8, - }, - FloatVectorField: &storage.FloatVectorFieldData{ - Data: []float32{4, 5, 6, 7, 4, 5, 6, 7}, - Dim: 4, - }, - ArrayField: &storage.ArrayFieldData{ - ElementType: schemapb.DataType_Int32, - Data: []*schemapb.ScalarField{ - { - Data: &schemapb.ScalarField_IntData{ - IntData: &schemapb.IntArray{Data: []int32{3, 2, 1}}, - }, - }, - { - Data: &schemapb.ScalarField_IntData{ - IntData: &schemapb.IntArray{Data: []int32{6, 5, 4}}, - }, - }, - }, - }, - JSONField: &storage.JSONFieldData{ - Data: [][]byte{ - []byte(`{"batch":2}`), - []byte(`{"key":"world"}`), - }, - }, - Float16VectorField: &storage.Float16VectorFieldData{ - Data: []byte{0, 255, 0, 255, 0, 255, 0, 255, 0, 255, 0, 255, 0, 255, 0, 255}, - Dim: 4, - }, - BFloat16VectorField: &storage.BFloat16VectorFieldData{ - Data: []byte{0, 255, 0, 255, 0, 255, 0, 255, 0, 255, 0, 255, 0, 255, 0, 255}, - Dim: 4, - }, - }, - } - - return &insertData, meta -} diff --git a/internal/datanode/iterators/deltalog_iterator_test.go b/internal/datanode/iterators/deltalog_iterator_test.go index 498b888c9b8f4..2630852b43c6c 100644 --- a/internal/datanode/iterators/deltalog_iterator_test.go +++ b/internal/datanode/iterators/deltalog_iterator_test.go @@ -33,7 +33,7 @@ func (s *DeltalogIteratorSuite) TestDeltalogIteratorIntPK() { } dCodec := storage.NewDeleteCodec() - blob, err := dCodec.Serialize(CollectionID, 1, 1, dData) + blob, err := dCodec.Serialize(10000, 1, 1, dData) s.Require().NoError(err) value := [][]byte{blob.Value} diff --git a/internal/storage/binlog_iterator.go b/internal/storage/binlog_iterator.go index 2eb291a1496ab..feecd6c21561f 100644 --- a/internal/storage/binlog_iterator.go +++ b/internal/storage/binlog_iterator.go @@ -126,92 +126,3 @@ func (itr *InsertBinlogIterator) hasNext() bool { func (itr *InsertBinlogIterator) isDisposed() bool { return atomic.LoadInt32(&itr.dispose) == 1 } - -// MergeIterator merge iterators. -type MergeIterator struct { - disposed int32 - pos int - iteraotrs []Iterator - tmpRecords []*Value - nextRecord *Value -} - -// NewMergeIterator return a new MergeIterator. -func NewMergeIterator(iterators []Iterator) *MergeIterator { - return &MergeIterator{ - iteraotrs: iterators, - tmpRecords: make([]*Value, len(iterators)), - } -} - -// HasNext returns true if the iterator have unread record -func (itr *MergeIterator) HasNext() bool { - return !itr.isDisposed() && itr.hasNext() -} - -// Next returns the next record -func (itr *MergeIterator) Next() (interface{}, error) { - if itr.isDisposed() { - return nil, ErrDisposed - } - - if !itr.hasNext() { - return nil, ErrNoMoreRecord - } - - tmpRecord := itr.nextRecord - itr.nextRecord = nil - return tmpRecord, nil -} - -// Dispose disposes the iterator -func (itr *MergeIterator) Dispose() { - if itr.isDisposed() { - return - } - - for _, tmpItr := range itr.iteraotrs { - if tmpItr != nil { - tmpItr.Dispose() - } - } - atomic.CompareAndSwapInt32(&itr.disposed, 0, 1) -} - -func (itr *MergeIterator) isDisposed() bool { - return atomic.LoadInt32(&itr.disposed) == 1 -} - -func (itr *MergeIterator) hasNext() bool { - if itr.nextRecord != nil { - return true - } - - var minRecord *Value - var minPos int - for i, tmpRecord := range itr.tmpRecords { - if tmpRecord == nil { - if itr.iteraotrs[i] != nil && itr.iteraotrs[i].HasNext() { - next, _ := itr.iteraotrs[i].Next() - itr.tmpRecords[i] = next.(*Value) - tmpRecord = itr.tmpRecords[i] - } - } - if tmpRecord == nil { - continue - } - if minRecord == nil || tmpRecord.ID < minRecord.ID { - minRecord = tmpRecord - minPos = i - } - } - - if minRecord == nil { - // all iterators have no more records - return false - } - - itr.tmpRecords[minPos] = nil - itr.nextRecord = minRecord - return true -} diff --git a/internal/storage/binlog_iterator_test.go b/internal/storage/binlog_iterator_test.go index 99f5a51e34331..ebe4f478e3067 100644 --- a/internal/storage/binlog_iterator_test.go +++ b/internal/storage/binlog_iterator_test.go @@ -279,80 +279,3 @@ func TestInsertlogIterator(t *testing.T) { assert.Equal(t, ErrNoMoreRecord, err) }) } - -func TestMergeIterator(t *testing.T) { - t.Run("empty iterators", func(t *testing.T) { - iterators := make([]Iterator, 0) - for i := 0; i < 3; i++ { - iterators = append(iterators, &InsertBinlogIterator{ - data: &InsertData{}, - }) - } - itr := NewMergeIterator(iterators) - assert.False(t, itr.HasNext()) - _, err := itr.Next() - assert.Equal(t, ErrNoMoreRecord, err) - }) - - t.Run("empty and non-empty iterators", func(t *testing.T) { - blobs, err := generateTestData(3) - assert.NoError(t, err) - insertItr, err := NewInsertBinlogIterator(blobs, common.RowIDField, schemapb.DataType_Int64) - assert.NoError(t, err) - iterators := []Iterator{ - &InsertBinlogIterator{data: &InsertData{}}, - insertItr, - } - - itr := NewMergeIterator(iterators) - - for i := 1; i <= 3; i++ { - assert.True(t, itr.HasNext()) - v, err := itr.Next() - assert.NoError(t, err) - value := v.(*Value) - assertTestDataInternal(t, i, value, false) - } - assert.False(t, itr.HasNext()) - _, err = itr.Next() - assert.Equal(t, ErrNoMoreRecord, err) - }) - - t.Run("non-empty iterators", func(t *testing.T) { - blobs, err := generateTestData(3) - assert.NoError(t, err) - itr1, err := NewInsertBinlogIterator(blobs, common.RowIDField, schemapb.DataType_Int64) - assert.NoError(t, err) - itr2, err := NewInsertBinlogIterator(blobs, common.RowIDField, schemapb.DataType_Int64) - assert.NoError(t, err) - iterators := []Iterator{itr1, itr2} - itr := NewMergeIterator(iterators) - - for i := 1; i <= 3; i++ { - for j := 0; j < 2; j++ { - assert.True(t, itr.HasNext()) - v, err := itr.Next() - assert.NoError(t, err) - value := v.(*Value) - assertTestDataInternal(t, i, value, false) - } - } - - assert.False(t, itr.HasNext()) - _, err = itr.Next() - assert.Equal(t, ErrNoMoreRecord, err) - }) - - t.Run("test dispose", func(t *testing.T) { - blobs, err := generateTestData(3) - assert.NoError(t, err) - itr1, err := NewInsertBinlogIterator(blobs, common.RowIDField, schemapb.DataType_Int64) - assert.NoError(t, err) - itr := NewMergeIterator([]Iterator{itr1}) - - itr.Dispose() - assert.False(t, itr.HasNext()) - _, err = itr.Next() - assert.Equal(t, ErrDisposed, err) - }) -}