Skip to content

Commit

Permalink
enhance: reduce stats task cost by skipping ser/de
Browse files Browse the repository at this point in the history
Signed-off-by: Ted Xu <[email protected]>
  • Loading branch information
tedxu committed Jan 24, 2025
1 parent 5fb597b commit a80189f
Show file tree
Hide file tree
Showing 11 changed files with 777 additions and 433 deletions.
92 changes: 40 additions & 52 deletions internal/datanode/compaction/merge_sort.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,46 @@
package compaction

import (
"container/heap"
"context"
"fmt"
sio "io"
"math"
"time"

"github.com/apache/arrow/go/v12/arrow/array"
"github.com/samber/lo"
"go.opentelemetry.io/otel"
"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/flushcommon/io"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/proto/datapb"
"github.com/milvus-io/milvus/pkg/util/timerecord"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

type segmentWriterWrapper struct {
*MultiSegmentWriter
}

var _ storage.RecordWriter = (*segmentWriterWrapper)(nil)

func (w *segmentWriterWrapper) GetWrittenUncompressed() uint64 {
return 0
}

func (w *segmentWriterWrapper) Write(record storage.Record) error {
return w.MultiSegmentWriter.WriteRecord(record)
}

func (w *segmentWriterWrapper) Close() error {
return nil
}

func mergeSortMultipleSegments(ctx context.Context,
plan *datapb.CompactionPlan,
collectionID, partitionID, maxRows int64,
Expand All @@ -43,15 +62,15 @@ func mergeSortMultipleSegments(ctx context.Context,
logIDAlloc := allocator.NewLocalAllocator(plan.GetBeginLogID(), math.MaxInt64)
compAlloc := NewCompactionAllocator(segIDAlloc, logIDAlloc)
mWriter := NewMultiSegmentWriter(binlogIO, compAlloc, plan, maxRows, partitionID, collectionID, bm25FieldIds)
writer := &segmentWriterWrapper{MultiSegmentWriter: mWriter}

pkField, err := typeutil.GetPrimaryFieldSchema(plan.GetSchema())
if err != nil {
log.Warn("failed to get pk field from schema")
return nil, err
}

// SegmentDeserializeReaderTest(binlogPaths, t.binlogIO, writer.GetPkID())
segmentReaders := make([]*SegmentDeserializeReader, len(binlogs))
segmentReaders := make([]storage.RecordReader, len(binlogs))
segmentFilters := make([]*EntityFilter, len(binlogs))
for i, s := range binlogs {
var binlogBatchCount int
Expand All @@ -75,7 +94,7 @@ func mergeSortMultipleSegments(ctx context.Context,
}
binlogPaths[idx] = batchPaths
}
segmentReaders[i] = NewSegmentDeserializeReader(ctx, binlogPaths, binlogIO, pkField.GetFieldID(), bm25FieldIds)
segmentReaders[i] = NewSegmentRecordReader(ctx, binlogPaths, binlogIO)
deltalogPaths := make([]string, 0)
for _, d := range s.GetDeltalogs() {
for _, l := range d.GetBinlogs() {
Expand All @@ -89,57 +108,26 @@ func mergeSortMultipleSegments(ctx context.Context,
segmentFilters[i] = newEntityFilter(delta, collectionTtl, currentTime)
}

advanceRow := func(i int) (*storage.Value, error) {
for {
v, err := segmentReaders[i].Next()
if err != nil {
return nil, err
}

if segmentFilters[i].Filtered(v.PK.GetValue(), uint64(v.Timestamp)) {
continue
}

return v, nil
var predicate func(r storage.Record, ri, i int) bool
switch pkField.DataType {
case schemapb.DataType_Int64:
predicate = func(r storage.Record, ri, i int) bool {
pk := r.Column(pkField.FieldID).(*array.Int64).Value(i)
ts := r.Column(common.TimeStampField).(*array.Int64).Value(i)
return segmentFilters[ri].Filtered(pk, uint64(ts))
}
}

pq := make(PriorityQueue, 0)
heap.Init(&pq)

for i := range segmentReaders {
v, err := advanceRow(i)
if err != nil {
log.Warn("compact wrong, failed to advance row", zap.Error(err))
return nil, err
case schemapb.DataType_VarChar:
predicate = func(r storage.Record, ri, i int) bool {
pk := r.Column(pkField.FieldID).(*array.String).Value(i)
ts := r.Column(common.TimeStampField).(*array.Int64).Value(i)
return segmentFilters[ri].Filtered(pk, uint64(ts))
}
heap.Push(&pq, &PQItem{
Value: v,
Index: i,
})
default:
log.Warn("compaction only support int64 and varchar pk field")
}

for pq.Len() > 0 {
smallest := heap.Pop(&pq).(*PQItem)
v := smallest.Value

err := mWriter.Write(v)
if err != nil {
log.Warn("compact wrong, failed to writer row", zap.Error(err))
return nil, err
}

iv, err := advanceRow(smallest.Index)
if err != nil && err != sio.EOF {
return nil, err
}
if err == nil {
next := &PQItem{
Value: iv,
Index: smallest.Index,
}
heap.Push(&pq, next)
}
if _, err = storage.MergeSort(segmentReaders, pkField.FieldID, writer, predicate); err != nil {
return nil, err
}

res, err := mWriter.Finish()
Expand Down
86 changes: 0 additions & 86 deletions internal/datanode/compaction/segment_reader_from_binlogs.go

This file was deleted.

31 changes: 31 additions & 0 deletions internal/datanode/compaction/segment_record_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package compaction

import (
"context"
"io"

"github.com/samber/lo"

binlogIO "github.com/milvus-io/milvus/internal/flushcommon/io"
"github.com/milvus-io/milvus/internal/storage"
)

func NewSegmentRecordReader(ctx context.Context, binlogPaths [][]string, binlogIO binlogIO.BinlogIO) storage.RecordReader {
pos := 0
return &storage.CompositeBinlogRecordReader{
BlobsReader: func() ([]*storage.Blob, error) {
if pos >= len(binlogPaths) {
return nil, io.EOF
}
bytesArr, err := binlogIO.Download(ctx, binlogPaths[pos])
if err != nil {
return nil, err
}
pos++
blobs := lo.Map(bytesArr, func(v []byte, i int) *storage.Blob {
return &storage.Blob{Key: binlogPaths[pos][i], Value: v}
})
return blobs, nil
},
}
}
Loading

0 comments on commit a80189f

Please sign in to comment.