diff --git a/internal/storage/binlog_iterator_test.go b/internal/storage/binlog_iterator_test.go index d387e0fedc2e7..fcb7cff0e1ee2 100644 --- a/internal/storage/binlog_iterator_test.go +++ b/internal/storage/binlog_iterator_test.go @@ -171,6 +171,7 @@ func generateTestData(num int) ([]*Blob, error) { }} blobs, err := insertCodec.Serialize(1, 1, data) + fmt.Println(blobs) return blobs, err } diff --git a/internal/storage/data_codec.go b/internal/storage/data_codec.go index 5a96589852fda..82ece3f2b2cc1 100644 --- a/internal/storage/data_codec.go +++ b/internal/storage/data_codec.go @@ -777,23 +777,12 @@ func (deleteCodec *DeleteCodec) Serialize(collectionID UniqueID, partitionID Uni // For each delete message, it will save pk and ts string to binlog separately, // to avoid json marshal. func (deleteCodec *DeleteCodec) SerializeV2(collectionID UniqueID, partitionID UniqueID, segmentID UniqueID, data *DeleteData) ([]*Blob, error) { + var blobs []*Blob + var writer *DeleteBinlogWriter length := len(data.Pks) if length != len(data.Tss) { return nil, fmt.Errorf("the length of pks, and TimeStamps is not equal") } - - // write pk - writer := NewDeleteBinlogWriter(schemapb.DataType_Int64, collectionID, partitionID, segmentID) - eventWriter, err := writer.NextDeleteEventWriter() - if err != nil { - writer.Close() - return nil, err - } - defer writer.Close() - defer eventWriter.Close() - - var blobs []*Blob - var int64Ts []int64 var startTs, endTs Timestamp startTs, endTs = math.MaxUint64, 0 for _, ts := range data.Tss { @@ -803,61 +792,45 @@ func (deleteCodec *DeleteCodec) SerializeV2(collectionID UniqueID, partitionID U if ts > endTs { endTs = ts } - int64Ts = append(int64Ts, int64(ts)) - } - var int64Pk []int64 - for _, pk := range data.Pks { - int64Pk = append(int64Pk, pk.GetValue().(int64)) - } - err = eventWriter.AddInt64ToPayload(int64Pk, nil) - if err != nil { - return nil, err - } - eventWriter.SetEventTimestamp(startTs, endTs) - writer.SetEventTimeStamp(startTs, endTs) - writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", binary.Size(int64Pk))) - err = writer.Finish() - if err != nil { - return nil, err } - buffer, err := writer.GetBuffer() - if err != nil { - return nil, err - } - blobs = append(blobs, &Blob{ - Key: strconv.Itoa(int(common.RowIDField)), - Value: buffer, - }) - eventWriter.Close() - writer.Close() - - // write ts - writer = NewDeleteBinlogWriter(schemapb.DataType_Int64, collectionID, partitionID, segmentID) - eventWriter, err = writer.NextDeleteEventWriter() - if err != nil { + for _, blobKey := range []FieldID{common.RowIDField, common.TimeStampField} { + writer = NewDeleteBinlogWriter(schemapb.DataType_Int64, collectionID, partitionID, segmentID) + eventWriter, err := writer.NextDeleteEventWriter() + if err != nil { + writer.Close() + return nil, err + } + var int64data []int64 + for i := 0; i < length; i++ { + switch blobKey { + case common.RowIDField: + int64data = append(int64data, data.Pks[i].GetValue().(int64)) + case common.TimeStampField: + int64data = append(int64data, int64(data.Tss[i])) + } + } + err = eventWriter.AddInt64ToPayload(int64data, nil) + if err != nil { + return nil, err + } + eventWriter.SetEventTimestamp(startTs, endTs) + writer.SetEventTimeStamp(startTs, endTs) + writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", binary.Size(int64data))) + err = writer.Finish() + if err != nil { + return nil, err + } + buffer, err := writer.GetBuffer() + if err != nil { + return nil, err + } + blobs = append(blobs, &Blob{ + Key: strconv.Itoa(int(blobKey)), + Value: buffer, + }) + eventWriter.Close() writer.Close() - return nil, err } - - err = eventWriter.AddInt64ToPayload(int64Ts, nil) - if err != nil { - return nil, err - } - eventWriter.SetEventTimestamp(startTs, endTs) - writer.SetEventTimeStamp(startTs, endTs) - writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", binary.Size(int64Ts))) - err = writer.Finish() - if err != nil { - return nil, err - } - if buffer, err = writer.GetBuffer(); err != nil { - return nil, err - } - blobs = append(blobs, &Blob{ - Key: strconv.Itoa(int(common.TimeStampField)), - Value: buffer, - MemorySize: data.Size(), - }) return blobs, nil } diff --git a/internal/storage/serde_events.go b/internal/storage/serde_events.go index 8f49a08f6023a..ada9db8806483 100644 --- a/internal/storage/serde_events.go +++ b/internal/storage/serde_events.go @@ -186,7 +186,6 @@ func newCompositeBinlogRecordReader(blobs []*Blob) (*compositeBinlogRecordReader }) sortedBlobs = append(sortedBlobs, blobsForField) } - fmt.Println(blobMap) return &compositeBinlogRecordReader{ blobs: sortedBlobs, }, nil @@ -559,33 +558,34 @@ func NewDeltalogSerializeWriter(partitionID, segmentID UniqueID, eventWriter *De }, batchSize), nil } -func NewDeltalogStreamWriterV2(collectionID, partitionID, segmentID UniqueID) *DeltalogStreamWriterV2 { - dws := &DeltalogStreamWriterV2{} - dws.pkStreamWriter = &DeltalogStreamWriter{ +func NewDeltalogStreamWriterV2(collectionID, partitionID, segmentID UniqueID) map[FieldID]*BinlogStreamWriter { + dws := make(map[FieldID]*BinlogStreamWriter, 2) + dws[common.RowIDField] = &BinlogStreamWriter{ collectionID: collectionID, partitionID: partitionID, segmentID: segmentID, + fieldSchema: &schemapb.FieldSchema{FieldID: common.RowIDField, Name: "rowid", DataType: schemapb.DataType_Int64}, } - dws.tsStreamWriter = &DeltalogStreamWriter{ + dws[common.TimeStampField] = &BinlogStreamWriter{ collectionID: collectionID, partitionID: partitionID, segmentID: segmentID, + fieldSchema: &schemapb.FieldSchema{FieldID: common.TimeStampField, Name: "ts", DataType: schemapb.DataType_Int64}, } return dws } -func NewDeltalogSerializeWriterV2(partitionID, segmentID UniqueID, eventWriter *DeltalogStreamWriterV2, batchSize int, +func NewDeltalogSerializeWriterV2(partitionID, segmentID UniqueID, eventWriters map[FieldID]*BinlogStreamWriter, batchSize int, ) (*SerializeWriter[*DeleteLog], error) { - pkRw, err := eventWriter.pkStreamWriter.GetRecordWriter() - if err != nil { - return nil, err - } - tsRw, err := eventWriter.tsStreamWriter.GetRecordWriter() - if err != nil { - return nil, err + rws := make(map[FieldID]RecordWriter, len(eventWriters)) + for fid := range eventWriters { + w := eventWriters[fid] + rw, err := w.GetRecordWriter() + if err != nil { + return nil, err + } + rws[fid] = rw } - rws := make(map[FieldID]RecordWriter, 2) - rws[common.PrimaryKeyField], rws[common.TimeStampField] = pkRw, tsRw compositeRecordWriter := newCompositeRecordWriter(rws) return NewSerializeRecordWriter(compositeRecordWriter, func(v []*DeleteLog) (Record, uint64, error) { builders := [2]array.Builder{ @@ -642,6 +642,7 @@ func NewDeltalogDeserializeReaderV2(blobs []*Blob, PKfieldID UniqueID) (*Deseria for j, dt := range r.Schema() { d, ok := serdeMap[dt].deserialize(r.Column(j), i) + fmt.Println(d) if ok { switch j { case common.TimeStampField: diff --git a/internal/storage/serde_events_test.go b/internal/storage/serde_events_test.go index b864a421207a3..651f660f30433 100644 --- a/internal/storage/serde_events_test.go +++ b/internal/storage/serde_events_test.go @@ -390,8 +390,8 @@ func TestDeltalogSerializeWriterV2(t *testing.T) { defer reader.Close() // Copy write the generated data - eventWriter := NewDeltalogStreamWriterV2(0, 0, 0) - writer, err := NewDeltalogSerializeWriterV2(0, 0, eventWriter, 7) + writers := NewDeltalogStreamWriterV2(0, 0, 0) + writer, err := NewDeltalogSerializeWriterV2(0, 0, writers, 7) assert.NoError(t, err) for i := 0; i < size; i++ { @@ -410,15 +410,18 @@ func TestDeltalogSerializeWriterV2(t *testing.T) { assert.NoError(t, err) // Read from the written data - pkBlob, err := eventWriter.pkStreamWriter.Finalize() - assert.NoError(t, err) - assert.NotNil(t, pkBlob) - - tsBlob, err := eventWriter.tsStreamWriter.Finalize() - assert.NoError(t, err) - assert.NotNil(t, tsBlob) + newblobs := make([]*Blob, len(writers)) + i := 0 + for _, w := range writers { + blob, err := w.Finalize() + assert.NoError(t, err) + assert.NotNil(t, blob) + assert.True(t, blob.MemorySize > 0) + newblobs[i] = blob + i++ + } // assert.Equal(t, blobs[0].Value, newblobs[0].Value) - reader, err = NewDeltalogDeserializeReaderV2([]*Blob{pkBlob, tsBlob}, common.RowIDField) + reader, err = NewDeltalogDeserializeReaderV2(blobs, common.RowIDField) assert.NoError(t, err) defer reader.Close() for i := 0; i < size; i++ {