diff --git a/internal/storage/binlog_iterator_test.go b/internal/storage/binlog_iterator_test.go index fcb7cff0e1ee2..d387e0fedc2e7 100644 --- a/internal/storage/binlog_iterator_test.go +++ b/internal/storage/binlog_iterator_test.go @@ -171,7 +171,6 @@ func generateTestData(num int) ([]*Blob, error) { }} blobs, err := insertCodec.Serialize(1, 1, data) - fmt.Println(blobs) return blobs, err } diff --git a/internal/storage/data_codec.go b/internal/storage/data_codec.go index 82ece3f2b2cc1..7c19da6ba4d63 100644 --- a/internal/storage/data_codec.go +++ b/internal/storage/data_codec.go @@ -35,8 +35,6 @@ import ( const ( // Ts is blob key "ts" Ts = "ts" - // Pk is blob key "pk" - Pk = "pk" // DDL is blob key "ddl" DDL = "ddl" // IndexParamsKey is blob key "indexParams" diff --git a/internal/storage/serde_events.go b/internal/storage/serde_events.go index ada9db8806483..7655d1252e49d 100644 --- a/internal/storage/serde_events.go +++ b/internal/storage/serde_events.go @@ -36,16 +36,6 @@ import ( "github.com/milvus-io/milvus/pkg/util/typeutil" ) -type StreamWriter interface { - GetRecordWriter() (RecordWriter, error) - Finalize() (*Blob, error) - WriteHeaders(w io.Writer) error -} - -var _ StreamWriter = (*BinlogStreamWriter)(nil) - -var _ StreamWriter = (*DeltalogStreamWriter)(nil) - var _ RecordReader = (*compositeBinlogRecordReader)(nil) type compositeBinlogRecordReader struct { @@ -304,7 +294,7 @@ func (bsw *BinlogStreamWriter) Finalize() (*Blob, error) { bsw.rw.Close() var b bytes.Buffer - if err := bsw.WriteHeaders(&b); err != nil { + if err := bsw.writeBinlogHeaders(&b); err != nil { return nil, err } if _, err := b.Write(bsw.buf.Bytes()); err != nil { @@ -318,7 +308,7 @@ func (bsw *BinlogStreamWriter) Finalize() (*Blob, error) { }, nil } -func (bsw *BinlogStreamWriter) WriteHeaders(w io.Writer) error { +func (bsw *BinlogStreamWriter) writeBinlogHeaders(w io.Writer) error { // Write magic number if err := binary.Write(w, common.Endian, MagicNumber); err != nil { return err @@ -426,11 +416,6 @@ func NewBinlogSerializeWriter(schema *schemapb.CollectionSchema, partitionID, se }, batchSize), nil } -type DeltalogStreamWriterV2 struct { - pkStreamWriter *DeltalogStreamWriter - tsStreamWriter *DeltalogStreamWriter -} - type DeltalogStreamWriter struct { collectionID UniqueID partitionID UniqueID @@ -465,7 +450,7 @@ func (dsw *DeltalogStreamWriter) Finalize() (*Blob, error) { dsw.rw.Close() var b bytes.Buffer - if err := dsw.WriteHeaders(&b); err != nil { + if err := dsw.writeDeltalogHeaders(&b); err != nil { return nil, err } if _, err := b.Write(dsw.buf.Bytes()); err != nil { @@ -478,7 +463,7 @@ func (dsw *DeltalogStreamWriter) Finalize() (*Blob, error) { }, nil } -func (dsw *DeltalogStreamWriter) WriteHeaders(w io.Writer) error { +func (dsw *DeltalogStreamWriter) writeDeltalogHeaders(w io.Writer) error { // Write magic number if err := binary.Write(w, common.Endian, MagicNumber); err != nil { return err diff --git a/internal/storage/serde_events_test.go b/internal/storage/serde_events_test.go index 651f660f30433..25aee6adbbc69 100644 --- a/internal/storage/serde_events_test.go +++ b/internal/storage/serde_events_test.go @@ -40,9 +40,15 @@ func TestBinlogDeserializeReader(t *testing.T) { defer reader.Close() err = reader.Next() assert.Equal(t, io.EOF, err) + + // blobs := generateTestData(t, 0) + // reader, err = NewBinlogDeserializeReader(blobs, common.RowIDField) + // assert.NoError(t, err) + // err = reader.Next() + // assert.Equal(t, io.EOF, err) }) - t.Run("test binlog deserialize", func(t *testing.T) { + t.Run("test deserialize", func(t *testing.T) { size := 3 blobs, err := generateTestData(size) assert.NoError(t, err) @@ -116,7 +122,7 @@ func TestBinlogSerializeWriter(t *testing.T) { assert.Equal(t, io.EOF, err) }) - t.Run("test binlog serialize", func(t *testing.T) { + t.Run("test serialize", func(t *testing.T) { size := 16 blobs, err := generateTestData(size) assert.NoError(t, err) @@ -266,7 +272,7 @@ func TestDeltalogDeserializeReader(t *testing.T) { assert.Equal(t, io.EOF, err) }) - t.Run("test deltalog deserialize", func(t *testing.T) { + t.Run("test deserialize", func(t *testing.T) { size := 3 blob, err := generateTestDeltalogData(size, false) assert.NoError(t, err) @@ -296,7 +302,7 @@ func TestDeltalogSerializeWriter(t *testing.T) { assert.Equal(t, io.EOF, err) }) - t.Run("test deltalog serialize", func(t *testing.T) { + t.Run("test serialize", func(t *testing.T) { size := 16 blob, err := generateTestDeltalogData(size, false) assert.NoError(t, err) @@ -351,7 +357,7 @@ func TestDeltalogDeserializeReaderV2(t *testing.T) { assert.Equal(t, io.EOF, err) }) - t.Run("test deltalog deserialize v2", func(t *testing.T) { + t.Run("test deserialize v2", func(t *testing.T) { size := 3 blob, err := generateTestDeltalogData(size, true) assert.NoError(t, err) @@ -381,7 +387,7 @@ func TestDeltalogSerializeWriterV2(t *testing.T) { assert.Equal(t, io.EOF, err) }) - t.Run("test deltalog serialize v2", func(t *testing.T) { + t.Run("test serialize v2", func(t *testing.T) { size := 16 blobs, err := generateTestDeltalogData(size, true) assert.NoError(t, err) diff --git a/pkg/common/common.go b/pkg/common/common.go index 0290d6c376fb5..5bd3aabc3c1d5 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -43,18 +43,12 @@ const ( // TimeStampField is the ID of the Timestamp field reserved by the system TimeStampField = 1 - // PrimaryKeyField is the ID of the PrimaryKey field reserved by the system - PrimaryKeyField = 2 - // RowIDFieldName defines the name of the RowID field RowIDFieldName = "RowID" // TimeStampFieldName defines the name of the Timestamp field TimeStampFieldName = "Timestamp" - // PrimaryKeyFieldName defines the name of the Timestamp field - PrimaryKeyFieldName = "PrimaryKey" - // MetaFieldName is the field name of dynamic schema MetaFieldName = "$meta"