Skip to content

Commit

Permalink
clean up new logic
Browse files Browse the repository at this point in the history
Signed-off-by: shaoting-huang <[email protected]>
  • Loading branch information
shaoting-huang committed Jun 24, 2024
1 parent b7fdb77 commit 63b49ed
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 90 deletions.
1 change: 1 addition & 0 deletions internal/storage/binlog_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ func generateTestData(num int) ([]*Blob, error) {
}}

blobs, err := insertCodec.Serialize(1, 1, data)
fmt.Println(blobs)
return blobs, err
}

Expand Down
103 changes: 38 additions & 65 deletions internal/storage/data_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,23 +777,12 @@ func (deleteCodec *DeleteCodec) Serialize(collectionID UniqueID, partitionID Uni
// For each delete message, it will save pk and ts string to binlog separately,
// to avoid json marshal.
func (deleteCodec *DeleteCodec) SerializeV2(collectionID UniqueID, partitionID UniqueID, segmentID UniqueID, data *DeleteData) ([]*Blob, error) {
var blobs []*Blob
var writer *DeleteBinlogWriter
length := len(data.Pks)
if length != len(data.Tss) {
return nil, fmt.Errorf("the length of pks, and TimeStamps is not equal")
}

// write pk
writer := NewDeleteBinlogWriter(schemapb.DataType_Int64, collectionID, partitionID, segmentID)
eventWriter, err := writer.NextDeleteEventWriter()
if err != nil {
writer.Close()
return nil, err
}
defer writer.Close()
defer eventWriter.Close()

var blobs []*Blob
var int64Ts []int64
var startTs, endTs Timestamp
startTs, endTs = math.MaxUint64, 0
for _, ts := range data.Tss {
Expand All @@ -803,61 +792,45 @@ func (deleteCodec *DeleteCodec) SerializeV2(collectionID UniqueID, partitionID U
if ts > endTs {
endTs = ts
}
int64Ts = append(int64Ts, int64(ts))
}
var int64Pk []int64
for _, pk := range data.Pks {
int64Pk = append(int64Pk, pk.GetValue().(int64))
}
err = eventWriter.AddInt64ToPayload(int64Pk, nil)
if err != nil {
return nil, err
}
eventWriter.SetEventTimestamp(startTs, endTs)
writer.SetEventTimeStamp(startTs, endTs)
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", binary.Size(int64Pk)))
err = writer.Finish()
if err != nil {
return nil, err
}
buffer, err := writer.GetBuffer()
if err != nil {
return nil, err
}
blobs = append(blobs, &Blob{
Key: strconv.Itoa(int(common.RowIDField)),
Value: buffer,
})
eventWriter.Close()
writer.Close()

// write ts
writer = NewDeleteBinlogWriter(schemapb.DataType_Int64, collectionID, partitionID, segmentID)
eventWriter, err = writer.NextDeleteEventWriter()
if err != nil {
for _, blobKey := range []FieldID{common.RowIDField, common.TimeStampField} {
writer = NewDeleteBinlogWriter(schemapb.DataType_Int64, collectionID, partitionID, segmentID)
eventWriter, err := writer.NextDeleteEventWriter()
if err != nil {
writer.Close()
return nil, err
}
var int64data []int64
for i := 0; i < length; i++ {
switch blobKey {
case common.RowIDField:
int64data = append(int64data, data.Pks[i].GetValue().(int64))
case common.TimeStampField:
int64data = append(int64data, int64(data.Tss[i]))
}
}
err = eventWriter.AddInt64ToPayload(int64data, nil)
if err != nil {
return nil, err
}
eventWriter.SetEventTimestamp(startTs, endTs)
writer.SetEventTimeStamp(startTs, endTs)
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", binary.Size(int64data)))
err = writer.Finish()
if err != nil {
return nil, err
}
buffer, err := writer.GetBuffer()
if err != nil {
return nil, err
}
blobs = append(blobs, &Blob{
Key: strconv.Itoa(int(blobKey)),
Value: buffer,
})
eventWriter.Close()
writer.Close()
return nil, err
}

err = eventWriter.AddInt64ToPayload(int64Ts, nil)
if err != nil {
return nil, err
}
eventWriter.SetEventTimestamp(startTs, endTs)
writer.SetEventTimeStamp(startTs, endTs)
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", binary.Size(int64Ts)))
err = writer.Finish()
if err != nil {
return nil, err
}
if buffer, err = writer.GetBuffer(); err != nil {
return nil, err
}
blobs = append(blobs, &Blob{
Key: strconv.Itoa(int(common.TimeStampField)),
Value: buffer,
MemorySize: data.Size(),
})
return blobs, nil
}

Expand Down
31 changes: 16 additions & 15 deletions internal/storage/serde_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,6 @@ func newCompositeBinlogRecordReader(blobs []*Blob) (*compositeBinlogRecordReader
})
sortedBlobs = append(sortedBlobs, blobsForField)
}
fmt.Println(blobMap)
return &compositeBinlogRecordReader{
blobs: sortedBlobs,
}, nil
Expand Down Expand Up @@ -559,33 +558,34 @@ func NewDeltalogSerializeWriter(partitionID, segmentID UniqueID, eventWriter *De
}, batchSize), nil
}

func NewDeltalogStreamWriterV2(collectionID, partitionID, segmentID UniqueID) *DeltalogStreamWriterV2 {
dws := &DeltalogStreamWriterV2{}
dws.pkStreamWriter = &DeltalogStreamWriter{
func NewDeltalogStreamWriterV2(collectionID, partitionID, segmentID UniqueID) map[FieldID]*BinlogStreamWriter {
dws := make(map[FieldID]*BinlogStreamWriter, 2)
dws[common.RowIDField] = &BinlogStreamWriter{
collectionID: collectionID,
partitionID: partitionID,
segmentID: segmentID,
fieldSchema: &schemapb.FieldSchema{FieldID: common.RowIDField, Name: "rowid", DataType: schemapb.DataType_Int64},
}
dws.tsStreamWriter = &DeltalogStreamWriter{
dws[common.TimeStampField] = &BinlogStreamWriter{
collectionID: collectionID,
partitionID: partitionID,
segmentID: segmentID,
fieldSchema: &schemapb.FieldSchema{FieldID: common.TimeStampField, Name: "ts", DataType: schemapb.DataType_Int64},
}
return dws
}

func NewDeltalogSerializeWriterV2(partitionID, segmentID UniqueID, eventWriter *DeltalogStreamWriterV2, batchSize int,
func NewDeltalogSerializeWriterV2(partitionID, segmentID UniqueID, eventWriters map[FieldID]*BinlogStreamWriter, batchSize int,
) (*SerializeWriter[*DeleteLog], error) {
pkRw, err := eventWriter.pkStreamWriter.GetRecordWriter()
if err != nil {
return nil, err
}
tsRw, err := eventWriter.tsStreamWriter.GetRecordWriter()
if err != nil {
return nil, err
rws := make(map[FieldID]RecordWriter, len(eventWriters))
for fid := range eventWriters {
w := eventWriters[fid]
rw, err := w.GetRecordWriter()
if err != nil {
return nil, err
}
rws[fid] = rw
}
rws := make(map[FieldID]RecordWriter, 2)
rws[common.PrimaryKeyField], rws[common.TimeStampField] = pkRw, tsRw
compositeRecordWriter := newCompositeRecordWriter(rws)
return NewSerializeRecordWriter(compositeRecordWriter, func(v []*DeleteLog) (Record, uint64, error) {
builders := [2]array.Builder{
Expand Down Expand Up @@ -642,6 +642,7 @@ func NewDeltalogDeserializeReaderV2(blobs []*Blob, PKfieldID UniqueID) (*Deseria

for j, dt := range r.Schema() {
d, ok := serdeMap[dt].deserialize(r.Column(j), i)
fmt.Println(d)
if ok {
switch j {
case common.TimeStampField:
Expand Down
23 changes: 13 additions & 10 deletions internal/storage/serde_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,8 +390,8 @@ func TestDeltalogSerializeWriterV2(t *testing.T) {
defer reader.Close()

// Copy write the generated data
eventWriter := NewDeltalogStreamWriterV2(0, 0, 0)
writer, err := NewDeltalogSerializeWriterV2(0, 0, eventWriter, 7)
writers := NewDeltalogStreamWriterV2(0, 0, 0)
writer, err := NewDeltalogSerializeWriterV2(0, 0, writers, 7)
assert.NoError(t, err)

for i := 0; i < size; i++ {
Expand All @@ -410,15 +410,18 @@ func TestDeltalogSerializeWriterV2(t *testing.T) {
assert.NoError(t, err)

// Read from the written data
pkBlob, err := eventWriter.pkStreamWriter.Finalize()
assert.NoError(t, err)
assert.NotNil(t, pkBlob)

tsBlob, err := eventWriter.tsStreamWriter.Finalize()
assert.NoError(t, err)
assert.NotNil(t, tsBlob)
newblobs := make([]*Blob, len(writers))
i := 0
for _, w := range writers {
blob, err := w.Finalize()
assert.NoError(t, err)
assert.NotNil(t, blob)
assert.True(t, blob.MemorySize > 0)
newblobs[i] = blob
i++
}
// assert.Equal(t, blobs[0].Value, newblobs[0].Value)
reader, err = NewDeltalogDeserializeReaderV2([]*Blob{pkBlob, tsBlob}, common.RowIDField)
reader, err = NewDeltalogDeserializeReaderV2(blobs, common.RowIDField)
assert.NoError(t, err)
defer reader.Close()
for i := 0; i < size; i++ {
Expand Down

0 comments on commit 63b49ed

Please sign in to comment.