diff --git a/internal/storage/event_data.go b/internal/storage/event_data.go index fe9f055324862..b6c55cb3116c0 100644 --- a/internal/storage/event_data.go +++ b/internal/storage/event_data.go @@ -32,6 +32,9 @@ import ( const originalSizeKey = "original_size" +// mark useMultiFieldFormat if there are multi fields in a log file +const useMultiFieldFormat = "useMultiFieldFormat" + type descriptorEventData struct { DescriptorEventDataFixPart ExtraLength int32 diff --git a/internal/storage/event_writer.go b/internal/storage/event_writer.go index 58c6c5a9ca2f9..6b9390da0a387 100644 --- a/internal/storage/event_writer.go +++ b/internal/storage/event_writer.go @@ -212,6 +212,16 @@ func newDescriptorEvent() *descriptorEvent { } } +func NewBaseDescriptorEvent(collectionID int64, partitionID int64, segmentID int64) *descriptorEvent { + de := newDescriptorEvent() + de.CollectionID = collectionID + de.PartitionID = partitionID + de.SegmentID = segmentID + de.StartTimestamp = 0 + de.EndTimestamp = 0 + return de +} + func newInsertEventWriter(dataType schemapb.DataType, nullable bool, dim ...int) (*insertEventWriter, error) { var payloadWriter PayloadWriterInterface var err error diff --git a/internal/storage/serde.go b/internal/storage/serde.go index 2f243aa166ef9..59761911f4338 100644 --- a/internal/storage/serde.go +++ b/internal/storage/serde.go @@ -675,6 +675,46 @@ func newSingleFieldRecordWriter(fieldId FieldID, field arrow.Field, writer io.Wr }, nil } +var _ RecordWriter = (*multiFieldRecordWriter)(nil) + +type multiFieldRecordWriter struct { + fw *pqarrow.FileWriter + fieldIds []FieldID + schema *arrow.Schema + + numRows int +} + +func (mfw *multiFieldRecordWriter) Write(r Record) error { + mfw.numRows += r.Len() + columns := make([]arrow.Array, len(mfw.fieldIds)) + for i, fieldId := range mfw.fieldIds { + columns[i] = r.Column(fieldId) + } + rec := array.NewRecord(mfw.schema, columns, int64(r.Len())) + defer rec.Release() + return mfw.fw.WriteBuffered(rec) +} + +func (mfw *multiFieldRecordWriter) Close() { + mfw.fw.Close() +} + +func newMultiFieldRecordWriter(fieldIds []FieldID, fields []arrow.Field, writer io.Writer) (*multiFieldRecordWriter, error) { + schema := arrow.NewSchema(fields, nil) + fw, err := pqarrow.NewFileWriter(schema, writer, + parquet.NewWriterProperties(parquet.WithMaxRowGroupLength(math.MaxInt64)), // No additional grouping for now. + pqarrow.DefaultWriterProps()) + if err != nil { + return nil, err + } + return &multiFieldRecordWriter{ + fw: fw, + fieldIds: fieldIds, + schema: schema, + }, nil +} + type SerializeWriter[T any] struct { rw RecordWriter serializer Serializer[T] diff --git a/internal/storage/serde_events.go b/internal/storage/serde_events.go index 7080de02ae850..910e6e41c1908 100644 --- a/internal/storage/serde_events.go +++ b/internal/storage/serde_events.go @@ -314,14 +314,9 @@ func (bsw *BinlogStreamWriter) writeBinlogHeaders(w io.Writer) error { return err } // Write descriptor - de := newDescriptorEvent() + de := NewBaseDescriptorEvent(bsw.collectionID, bsw.partitionID, bsw.segmentID) de.PayloadDataType = bsw.fieldSchema.DataType - de.CollectionID = bsw.collectionID - de.PartitionID = bsw.partitionID - de.SegmentID = bsw.segmentID de.FieldID = bsw.fieldSchema.FieldID - de.StartTimestamp = 0 - de.EndTimestamp = 0 de.descriptorEventData.AddExtra(originalSizeKey, strconv.Itoa(bsw.memorySize)) if err := de.Write(w); err != nil { return err @@ -420,6 +415,7 @@ type DeltalogStreamWriter struct { collectionID UniqueID partitionID UniqueID segmentID UniqueID + fieldSchema *schemapb.FieldSchema memorySize int // To be updated on the fly buf bytes.Buffer @@ -430,17 +426,24 @@ func (dsw *DeltalogStreamWriter) GetRecordWriter() (RecordWriter, error) { if dsw.rw != nil { return dsw.rw, nil } - - rw, err := newSingleFieldRecordWriter(0, arrow.Field{ - Name: "delta", - Type: arrow.BinaryTypes.String, - Nullable: false, - }, &dsw.buf) - if err != nil { - return nil, err + switch dsw.fieldSchema.DataType { + case schemapb.DataType_Int64, schemapb.DataType_VarChar, schemapb.DataType_String: + dim, _ := typeutil.GetDim(dsw.fieldSchema) + rw, err := newSingleFieldRecordWriter(dsw.fieldSchema.FieldID, arrow.Field{ + Name: dsw.fieldSchema.Name, + Type: serdeMap[dsw.fieldSchema.DataType].arrowType(int(dim)), + Nullable: false, // No nullable check here. + }, &dsw.buf) + if err != nil { + return nil, err + } + dsw.rw = rw + return rw, nil + default: + return nil, merr.WrapErrServiceInternal(fmt.Sprintf( + "does not support delta log primary key data type %s", + dsw.fieldSchema.DataType.String())) } - dsw.rw = rw - return rw, nil } func (dsw *DeltalogStreamWriter) Finalize() (*Blob, error) { @@ -469,13 +472,9 @@ func (dsw *DeltalogStreamWriter) writeDeltalogHeaders(w io.Writer) error { return err } // Write descriptor - de := newDescriptorEvent() - de.PayloadDataType = schemapb.DataType_String - de.CollectionID = dsw.collectionID - de.PartitionID = dsw.partitionID - de.SegmentID = dsw.segmentID - de.StartTimestamp = 0 - de.EndTimestamp = 0 + de := NewBaseDescriptorEvent(dsw.collectionID, dsw.partitionID, dsw.segmentID) + de.PayloadDataType = dsw.fieldSchema.DataType + de.FieldID = dsw.fieldSchema.FieldID de.descriptorEventData.AddExtra(originalSizeKey, strconv.Itoa(dsw.memorySize)) if err := de.Write(w); err != nil { return err @@ -502,6 +501,11 @@ func NewDeltalogStreamWriter(collectionID, partitionID, segmentID UniqueID) *Del collectionID: collectionID, partitionID: partitionID, segmentID: segmentID, + fieldSchema: &schemapb.FieldSchema{ + FieldID: common.RowIDField, + Name: "delta", + DataType: schemapb.DataType_String, + }, } } @@ -542,3 +546,318 @@ func NewDeltalogSerializeWriter(partitionID, segmentID UniqueID, eventWriter *De return newSimpleArrowRecord(array.NewRecord(arrow.NewSchema(field, nil), arr, int64(len(v))), schema, field2Col), memorySize, nil }, batchSize), nil } + +var _ RecordReader = (*simpleArrowRecordReader)(nil) + +type simpleArrowRecordReader struct { + blobs []*Blob + + blobPos int + rr array.RecordReader + closer func() + + r *simpleArrowRecord +} + +func (crr *simpleArrowRecordReader) iterateNextBatch() error { + if crr.closer != nil { + crr.closer() + } + + crr.blobPos++ + if crr.blobPos >= len(crr.blobs) { + return io.EOF + } + + reader, err := NewBinlogReader(crr.blobs[crr.blobPos].Value) + if err != nil { + return err + } + + er, err := reader.NextEventReader() + if err != nil { + return err + } + rr, err := er.GetArrowRecordReader() + if err != nil { + return err + } + crr.rr = rr + crr.closer = func() { + crr.rr.Release() + reader.Close() + } + + return nil +} + +func (crr *simpleArrowRecordReader) Next() error { + if crr.rr == nil { + if crr.blobs == nil || len(crr.blobs) == 0 { + return io.EOF + } + crr.blobPos = -1 + crr.r = &simpleArrowRecord{ + schema: make(map[FieldID]schemapb.DataType), + field2Col: make(map[FieldID]int), + } + if err := crr.iterateNextBatch(); err != nil { + return err + } + } + + composeRecord := func() bool { + if ok := crr.rr.Next(); !ok { + return false + } + record := crr.rr.Record() + for i := range record.Schema().Fields() { + crr.r.schema[FieldID(i)] = schemapb.DataType_Int64 + crr.r.field2Col[FieldID(i)] = i + } + crr.r.r = record + return true + } + + if ok := composeRecord(); !ok { + if err := crr.iterateNextBatch(); err != nil { + return err + } + if ok := composeRecord(); !ok { + return io.EOF + } + } + return nil +} + +func (crr *simpleArrowRecordReader) Record() Record { + return crr.r +} + +func (crr *simpleArrowRecordReader) Close() { + if crr.closer != nil { + crr.closer() + } +} + +func newSimpleArrowRecordReader(blobs []*Blob) (*simpleArrowRecordReader, error) { + return &simpleArrowRecordReader{ + blobs: blobs, + }, nil +} + +func NewMultiFieldDeltalogStreamWriter(collectionID, partitionID, segmentID UniqueID) *MultiFieldDeltalogStreamWriter { + return &MultiFieldDeltalogStreamWriter{ + collectionID: collectionID, + partitionID: partitionID, + segmentID: segmentID, + fieldSchemas: []*schemapb.FieldSchema{ + { + FieldID: common.RowIDField, + Name: "pk", + DataType: schemapb.DataType_Int64, + }, + { + FieldID: common.TimeStampField, + Name: "ts", + DataType: schemapb.DataType_Int64, + }, + }, + } +} + +type MultiFieldDeltalogStreamWriter struct { + collectionID UniqueID + partitionID UniqueID + segmentID UniqueID + fieldSchemas []*schemapb.FieldSchema + + memorySize int // To be updated on the fly + buf bytes.Buffer + rw *multiFieldRecordWriter +} + +func (dsw *MultiFieldDeltalogStreamWriter) GetRecordWriter() (RecordWriter, error) { + if dsw.rw != nil { + return dsw.rw, nil + } + + fieldIds := make([]FieldID, len(dsw.fieldSchemas)) + fields := make([]arrow.Field, len(dsw.fieldSchemas)) + + for i, fieldSchema := range dsw.fieldSchemas { + fieldIds[i] = fieldSchema.FieldID + dim, _ := typeutil.GetDim(fieldSchema) + fields[i] = arrow.Field{ + Name: fieldSchema.Name, + Type: serdeMap[fieldSchema.DataType].arrowType(int(dim)), + Nullable: false, // No nullable check here. + } + } + + rw, err := newMultiFieldRecordWriter(fieldIds, fields, &dsw.buf) + if err != nil { + return nil, err + } + dsw.rw = rw + return rw, nil +} + +func (dsw *MultiFieldDeltalogStreamWriter) Finalize() (*Blob, error) { + if dsw.rw == nil { + return nil, io.ErrUnexpectedEOF + } + dsw.rw.Close() + + var b bytes.Buffer + if err := dsw.writeDeltalogHeaders(&b); err != nil { + return nil, err + } + if _, err := b.Write(dsw.buf.Bytes()); err != nil { + return nil, err + } + return &Blob{ + Value: b.Bytes(), + RowNum: int64(dsw.rw.numRows), + MemorySize: int64(dsw.memorySize), + }, nil +} + +func (dsw *MultiFieldDeltalogStreamWriter) writeDeltalogHeaders(w io.Writer) error { + // Write magic number + if err := binary.Write(w, common.Endian, MagicNumber); err != nil { + return err + } + // Write descriptor + de := NewBaseDescriptorEvent(dsw.collectionID, dsw.partitionID, dsw.segmentID) + de.PayloadDataType = schemapb.DataType_Int64 + de.descriptorEventData.AddExtra(originalSizeKey, strconv.Itoa(dsw.memorySize)) + de.descriptorEventData.AddExtra(useMultiFieldFormat, true) + if err := de.Write(w); err != nil { + return err + } + // Write event header + eh := newEventHeader(DeleteEventType) + // Write event data + ev := newDeleteEventData() + ev.StartTimestamp = 1 + ev.EndTimestamp = 1 + eh.EventLength = int32(dsw.buf.Len()) + eh.GetMemoryUsageInBytes() + int32(binary.Size(ev)) + // eh.NextPosition = eh.EventLength + w.Offset() + if err := eh.Write(w); err != nil { + return err + } + if err := ev.WriteEventData(w); err != nil { + return err + } + return nil +} + +func NewDeltalogMultiFieldWriter(partitionID, segmentID UniqueID, eventWriter *MultiFieldDeltalogStreamWriter, batchSize int, +) (*SerializeWriter[*DeleteLog], error) { + rw, err := eventWriter.GetRecordWriter() + if err != nil { + return nil, err + } + return NewSerializeRecordWriter[*DeleteLog](rw, func(v []*DeleteLog) (Record, uint64, error) { + builders := [2]*array.Int64Builder{ + array.NewInt64Builder(memory.DefaultAllocator), + array.NewInt64Builder(memory.DefaultAllocator), + } + var memorySize uint64 + for _, vv := range v { + builders[0].Append(vv.Pk.GetValue().(int64)) + memorySize += uint64(vv.Pk.GetValue().(int64)) + + builders[1].Append(int64(vv.Ts)) + memorySize += uint64(vv.Ts) + } + arrs := []arrow.Array{builders[0].NewArray(), builders[1].NewArray()} + fields := []arrow.Field{ + { + Name: "pk", + Type: arrow.PrimitiveTypes.Int64, + Nullable: false, + }, + { + Name: "ts", + Type: arrow.PrimitiveTypes.Int64, + Nullable: false, + }, + } + field2Col := map[FieldID]int{ + common.RowIDField: 0, + common.TimeStampField: 1, + } + schema := map[FieldID]schemapb.DataType{ + common.RowIDField: schemapb.DataType_Int64, + common.TimeStampField: schemapb.DataType_Int64, + } + return newSimpleArrowRecord(array.NewRecord(arrow.NewSchema(fields, nil), arrs, int64(len(v))), schema, field2Col), memorySize, nil + }, batchSize), nil +} + +func NewDeltalogMultiFieldReader(blobs []*Blob) (*DeserializeReader[*DeleteLog], error) { + reader, err := newSimpleArrowRecordReader(blobs) + if err != nil { + return nil, err + } + return NewDeserializeReader(reader, func(r Record, v []*DeleteLog) error { + for i := 0; i < r.Len(); i++ { + if v[i] == nil { + v[i] = &DeleteLog{} + } + } + for fid := range r.Schema() { + a := r.Column(fid) + + switch fid { + case common.RowIDField: + int64Array := a.(*array.Int64) + for i := 0; i < int64Array.Len(); i++ { + v[i].Pk = NewInt64PrimaryKey(int64Array.Value(i)) + } + case common.TimeStampField: + int64Array := a.(*array.Int64) + for i := 0; i < int64Array.Len(); i++ { + v[i].Ts = uint64(int64Array.Value(i)) + } + default: + return fmt.Errorf("unexpected field ID: %d", fid) + } + } + return nil + }), nil +} + +func NewDeltalogDeserializeReaderUnified(blobs []*Blob) (*DeserializeReader[*DeleteLog], error) { + if supportMultiFieldFormat(blobs) { + return NewDeltalogMultiFieldReader(blobs) + } + return NewDeltalogDeserializeReader(blobs) +} + +// check delta log description data to see if it is the format with +// pk and ts column separately +func supportMultiFieldFormat(blobs []*Blob) bool { + if blobs != nil && len(blobs) > 0 { + reader, err := NewBinlogReader(blobs[0].Value) + defer reader.Close() + if err != nil { + return false + } + er, err := reader.NextEventReader() + defer er.Close() + if err != nil { + return false + } + rr, err := er.GetArrowRecordReader() + defer rr.Release() + if err != nil { + return false + } + useMultiFieldFormat := reader.descriptorEventData.Extras[useMultiFieldFormat] + return useMultiFieldFormat != nil && useMultiFieldFormat.(bool) + } + return false +} diff --git a/internal/storage/serde_events_test.go b/internal/storage/serde_events_test.go index 1b0d7ad88fe12..6523834cf111f 100644 --- a/internal/storage/serde_events_test.go +++ b/internal/storage/serde_events_test.go @@ -291,7 +291,7 @@ func TestDeltalogDeserializeReader(t *testing.T) { func TestDeltalogSerializeWriter(t *testing.T) { t.Run("test empty data", func(t *testing.T) { - reader, err := NewDeltalogDeserializeReader(nil) + reader, err := NewDeltalogDeserializeReaderUnified(nil) assert.NoError(t, err) defer reader.Close() err = reader.Next() @@ -302,7 +302,7 @@ func TestDeltalogSerializeWriter(t *testing.T) { size := 16 blob, err := generateTestDeltalogData(size) assert.NoError(t, err) - reader, err := NewDeltalogDeserializeReader([]*Blob{blob}) + reader, err := NewDeltalogDeserializeReaderUnified([]*Blob{blob}) assert.NoError(t, err) defer reader.Close() @@ -331,7 +331,7 @@ func TestDeltalogSerializeWriter(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, newblob) // assert.Equal(t, blobs[0].Value, newblobs[0].Value) - reader, err = NewDeltalogDeserializeReader([]*Blob{newblob}) + reader, err = NewDeltalogDeserializeReaderUnified([]*Blob{newblob}) assert.NoError(t, err) defer reader.Close() for i := 0; i < size; i++ { @@ -343,3 +343,56 @@ func TestDeltalogSerializeWriter(t *testing.T) { } }) } + +func TestDeltalogV2(t *testing.T) { + t.Run("test empty data", func(t *testing.T) { + reader, err := NewDeltalogDeserializeReaderUnified(nil) + assert.NoError(t, err) + defer reader.Close() + err = reader.Next() + assert.Equal(t, io.EOF, err) + }) + + t.Run("test serialize deserialize", func(t *testing.T) { + eventWriter := NewMultiFieldDeltalogStreamWriter(0, 0, 0) + writer, err := NewDeltalogMultiFieldWriter(0, 0, eventWriter, 7) + assert.NoError(t, err) + + size := 10 + pks := make([]int64, size) + tss := make([]uint64, size) + for i := 0; i < size; i++ { + pks[i] = int64(i) + tss[i] = uint64(i + 1) + } + data := make([]*DeleteLog, size) + for i := range pks { + data[i] = NewDeleteLog(NewInt64PrimaryKey(pks[i]), tss[i]) + } + + // Serialize the data + for i := 0; i < size; i++ { + err := writer.Write(data[i]) + assert.NoError(t, err) + } + err = writer.Close() + assert.NoError(t, err) + + blob, err := eventWriter.Finalize() + assert.NoError(t, err) + assert.NotNil(t, blob) + blobs := []*Blob{blob} + + // Deserialize the data + reader, err := NewDeltalogDeserializeReaderUnified(blobs) + assert.NoError(t, err) + defer reader.Close() + for i := 0; i < size; i++ { + err = reader.Next() + assert.NoError(t, err) + + value := reader.Value() + assertTestDeltalogData(t, i, value) + } + }) +}