Skip to content

Commit

Permalink
clean up new logic
Browse files Browse the repository at this point in the history
Signed-off-by: shaoting-huang <[email protected]>
  • Loading branch information
shaoting-huang committed Jun 24, 2024
1 parent 63b49ed commit 2f38ccd
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 34 deletions.
1 change: 0 additions & 1 deletion internal/storage/binlog_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ func generateTestData(num int) ([]*Blob, error) {
}}

blobs, err := insertCodec.Serialize(1, 1, data)
fmt.Println(blobs)
return blobs, err
}

Expand Down
2 changes: 0 additions & 2 deletions internal/storage/data_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ import (
const (
// Ts is blob key "ts"
Ts = "ts"
// Pk is blob key "pk"
Pk = "pk"
// DDL is blob key "ddl"
DDL = "ddl"
// IndexParamsKey is blob key "indexParams"
Expand Down
23 changes: 4 additions & 19 deletions internal/storage/serde_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,6 @@ import (
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

type StreamWriter interface {
GetRecordWriter() (RecordWriter, error)
Finalize() (*Blob, error)
WriteHeaders(w io.Writer) error
}

var _ StreamWriter = (*BinlogStreamWriter)(nil)

var _ StreamWriter = (*DeltalogStreamWriter)(nil)

var _ RecordReader = (*compositeBinlogRecordReader)(nil)

type compositeBinlogRecordReader struct {
Expand Down Expand Up @@ -304,7 +294,7 @@ func (bsw *BinlogStreamWriter) Finalize() (*Blob, error) {
bsw.rw.Close()

var b bytes.Buffer
if err := bsw.WriteHeaders(&b); err != nil {
if err := bsw.writeBinlogHeaders(&b); err != nil {
return nil, err
}
if _, err := b.Write(bsw.buf.Bytes()); err != nil {
Expand All @@ -318,7 +308,7 @@ func (bsw *BinlogStreamWriter) Finalize() (*Blob, error) {
}, nil
}

func (bsw *BinlogStreamWriter) WriteHeaders(w io.Writer) error {
func (bsw *BinlogStreamWriter) writeBinlogHeaders(w io.Writer) error {
// Write magic number
if err := binary.Write(w, common.Endian, MagicNumber); err != nil {
return err
Expand Down Expand Up @@ -426,11 +416,6 @@ func NewBinlogSerializeWriter(schema *schemapb.CollectionSchema, partitionID, se
}, batchSize), nil
}

type DeltalogStreamWriterV2 struct {
pkStreamWriter *DeltalogStreamWriter
tsStreamWriter *DeltalogStreamWriter
}

type DeltalogStreamWriter struct {
collectionID UniqueID
partitionID UniqueID
Expand Down Expand Up @@ -465,7 +450,7 @@ func (dsw *DeltalogStreamWriter) Finalize() (*Blob, error) {
dsw.rw.Close()

var b bytes.Buffer
if err := dsw.WriteHeaders(&b); err != nil {
if err := dsw.writeDeltalogHeaders(&b); err != nil {
return nil, err
}
if _, err := b.Write(dsw.buf.Bytes()); err != nil {
Expand All @@ -478,7 +463,7 @@ func (dsw *DeltalogStreamWriter) Finalize() (*Blob, error) {
}, nil
}

func (dsw *DeltalogStreamWriter) WriteHeaders(w io.Writer) error {
func (dsw *DeltalogStreamWriter) writeDeltalogHeaders(w io.Writer) error {
// Write magic number
if err := binary.Write(w, common.Endian, MagicNumber); err != nil {
return err
Expand Down
18 changes: 12 additions & 6 deletions internal/storage/serde_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,15 @@ func TestBinlogDeserializeReader(t *testing.T) {
defer reader.Close()
err = reader.Next()
assert.Equal(t, io.EOF, err)

// blobs := generateTestData(t, 0)
// reader, err = NewBinlogDeserializeReader(blobs, common.RowIDField)
// assert.NoError(t, err)
// err = reader.Next()
// assert.Equal(t, io.EOF, err)
})

t.Run("test binlog deserialize", func(t *testing.T) {
t.Run("test deserialize", func(t *testing.T) {
size := 3
blobs, err := generateTestData(size)
assert.NoError(t, err)
Expand Down Expand Up @@ -116,7 +122,7 @@ func TestBinlogSerializeWriter(t *testing.T) {
assert.Equal(t, io.EOF, err)
})

t.Run("test binlog serialize", func(t *testing.T) {
t.Run("test serialize", func(t *testing.T) {
size := 16
blobs, err := generateTestData(size)
assert.NoError(t, err)
Expand Down Expand Up @@ -266,7 +272,7 @@ func TestDeltalogDeserializeReader(t *testing.T) {
assert.Equal(t, io.EOF, err)
})

t.Run("test deltalog deserialize", func(t *testing.T) {
t.Run("test deserialize", func(t *testing.T) {
size := 3
blob, err := generateTestDeltalogData(size, false)
assert.NoError(t, err)
Expand Down Expand Up @@ -296,7 +302,7 @@ func TestDeltalogSerializeWriter(t *testing.T) {
assert.Equal(t, io.EOF, err)
})

t.Run("test deltalog serialize", func(t *testing.T) {
t.Run("test serialize", func(t *testing.T) {
size := 16
blob, err := generateTestDeltalogData(size, false)
assert.NoError(t, err)
Expand Down Expand Up @@ -351,7 +357,7 @@ func TestDeltalogDeserializeReaderV2(t *testing.T) {
assert.Equal(t, io.EOF, err)
})

t.Run("test deltalog deserialize v2", func(t *testing.T) {
t.Run("test deserialize v2", func(t *testing.T) {
size := 3
blob, err := generateTestDeltalogData(size, true)
assert.NoError(t, err)
Expand Down Expand Up @@ -381,7 +387,7 @@ func TestDeltalogSerializeWriterV2(t *testing.T) {
assert.Equal(t, io.EOF, err)
})

t.Run("test deltalog serialize v2", func(t *testing.T) {
t.Run("test serialize v2", func(t *testing.T) {
size := 16
blobs, err := generateTestDeltalogData(size, true)
assert.NoError(t, err)
Expand Down
6 changes: 0 additions & 6 deletions pkg/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,12 @@ const (
// TimeStampField is the ID of the Timestamp field reserved by the system
TimeStampField = 1

// PrimaryKeyField is the ID of the PrimaryKey field reserved by the system
PrimaryKeyField = 2

// RowIDFieldName defines the name of the RowID field
RowIDFieldName = "RowID"

// TimeStampFieldName defines the name of the Timestamp field
TimeStampFieldName = "Timestamp"

// PrimaryKeyFieldName defines the name of the Timestamp field
PrimaryKeyFieldName = "PrimaryKey"

// MetaFieldName is the field name of dynamic schema
MetaFieldName = "$meta"

Expand Down

0 comments on commit 2f38ccd

Please sign in to comment.