Skip to content

Commit

Permalink
add delta log stream new format reader and writer
Browse files Browse the repository at this point in the history
Signed-off-by: shaoting-huang <[email protected]>
  • Loading branch information
shaoting-huang committed Jun 27, 2024
1 parent 380d3f4 commit 4a2cc18
Show file tree
Hide file tree
Showing 5 changed files with 451 additions and 26 deletions.
3 changes: 3 additions & 0 deletions internal/storage/event_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ import (

const originalSizeKey = "original_size"

// mark useMultiFieldFormat if there are multi fields in a log file
const useMultiFieldFormat = "useMultiFieldFormat"

type descriptorEventData struct {
DescriptorEventDataFixPart
ExtraLength int32
Expand Down
10 changes: 10 additions & 0 deletions internal/storage/event_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,16 @@ func newDescriptorEvent() *descriptorEvent {
}
}

func NewBaseDescriptorEvent(collectionID int64, partitionID int64, segmentID int64) *descriptorEvent {
de := newDescriptorEvent()
de.CollectionID = collectionID
de.PartitionID = partitionID
de.SegmentID = segmentID
de.StartTimestamp = 0
de.EndTimestamp = 0
return de
}

func newInsertEventWriter(dataType schemapb.DataType, nullable bool, dim ...int) (*insertEventWriter, error) {
var payloadWriter PayloadWriterInterface
var err error
Expand Down
40 changes: 40 additions & 0 deletions internal/storage/serde.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,46 @@ func newSingleFieldRecordWriter(fieldId FieldID, field arrow.Field, writer io.Wr
}, nil
}

var _ RecordWriter = (*multiFieldRecordWriter)(nil)

type multiFieldRecordWriter struct {
fw *pqarrow.FileWriter
fieldIds []FieldID
schema *arrow.Schema

numRows int
}

func (mfw *multiFieldRecordWriter) Write(r Record) error {
mfw.numRows += r.Len()
columns := make([]arrow.Array, len(mfw.fieldIds))
for i, fieldId := range mfw.fieldIds {
columns[i] = r.Column(fieldId)
}
rec := array.NewRecord(mfw.schema, columns, int64(r.Len()))
defer rec.Release()
return mfw.fw.WriteBuffered(rec)
}

func (mfw *multiFieldRecordWriter) Close() {
mfw.fw.Close()
}

func newMultiFieldRecordWriter(fieldIds []FieldID, fields []arrow.Field, writer io.Writer) (*multiFieldRecordWriter, error) {
schema := arrow.NewSchema(fields, nil)
fw, err := pqarrow.NewFileWriter(schema, writer,
parquet.NewWriterProperties(parquet.WithMaxRowGroupLength(math.MaxInt64)), // No additional grouping for now.
pqarrow.DefaultWriterProps())
if err != nil {
return nil, err
}
return &multiFieldRecordWriter{
fw: fw,
fieldIds: fieldIds,
schema: schema,
}, nil
}

type SerializeWriter[T any] struct {
rw RecordWriter
serializer Serializer[T]
Expand Down
Loading

0 comments on commit 4a2cc18

Please sign in to comment.