Skip to content

Commit

Permalink
Determine the number of buffers based on the resource limits of the D…
Browse files Browse the repository at this point in the history
…ataNode

Signed-off-by: Cai Zhang <[email protected]>
  • Loading branch information
xiaocai2333 committed Dec 4, 2024
1 parent 319f549 commit 13af6e7
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 12 deletions.
2 changes: 1 addition & 1 deletion configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,7 @@ dataNode:
slot:
slotCap: 16 # The maximum number of tasks(e.g. compaction, importing) allowed to run concurrently on a datanode
clusteringCompaction:
memoryBufferRatio: 0.1 # The ratio of memory buffer of clustering compaction. Data larger than threshold will be flushed to storage.
memoryBufferRatio: 0.3 # The ratio of memory buffer of clustering compaction. Data larger than threshold will be flushed to storage.
workPoolSize: 8 # worker pool size for one clustering compaction job.
ip: # TCP/IP address of dataNode. If not specified, use the first unicastable address
port: 21124 # TCP port of dataNode
Expand Down
75 changes: 65 additions & 10 deletions internal/datanode/compaction/clustering_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ import (
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

const (
expectedBinlogSize = 16 * 1024 * 1024
)

var _ Compactor = (*clusteringCompactionTask)(nil)

type clusteringCompactionTask struct {
Expand Down Expand Up @@ -525,6 +529,7 @@ func (t *clusteringCompactionTask) mappingSegment(
fieldBinlogPaths = append(fieldBinlogPaths, ps)
}

var offset int64 = -1
for _, paths := range fieldBinlogPaths {
allValues, err := t.binlogIO.Download(ctx, paths)
if err != nil {
Expand All @@ -540,7 +545,6 @@ func (t *clusteringCompactionTask) mappingSegment(
return err
}

var offset int64 = -1
for {
err := pkIter.Next()
if err != nil {
Expand Down Expand Up @@ -1032,6 +1036,7 @@ func (t *clusteringCompactionTask) scalarAnalyze(ctx context.Context) (map[inter
zap.Int64("collectionID", t.GetCollection()),
zap.Int64("partitionID", t.partitionID),
zap.Int("segments", len(inputSegments)),
zap.Int("clustering num", len(analyzeDict)),
zap.Duration("elapse", time.Since(analyzeStart)))
return analyzeDict, nil
}
Expand Down Expand Up @@ -1146,19 +1151,42 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment(
return analyzeResult, nil
}

func (t *clusteringCompactionTask) scalarPlan(dict map[interface{}]int64) [][]interface{} {
keys := lo.MapToSlice(dict, func(k interface{}, _ int64) interface{} {
return k
})
sort.Slice(keys, func(i, j int) bool {
return storage.NewScalarFieldValue(t.clusteringKeyField.DataType, keys[i]).LE(storage.NewScalarFieldValue(t.clusteringKeyField.DataType, keys[j]))
})
func (t *clusteringCompactionTask) generatedScalarPlan(maxRows, preferRows int64, keys []interface{}, dict map[interface{}]int64) [][]interface{} {
buckets := make([][]interface{}, 0)
currentBucket := make([]interface{}, 0)
var currentBucketSize int64 = 0
for _, key := range keys {
// todo can optimize
if dict[key] > preferRows {
if len(currentBucket) != 0 {
buckets = append(buckets, currentBucket)
currentBucket = make([]interface{}, 0)
currentBucketSize = 0
}
buckets = append(buckets, []interface{}{key})
} else if currentBucketSize+dict[key] > maxRows {
buckets = append(buckets, currentBucket)
currentBucket = []interface{}{key}
currentBucketSize = dict[key]
} else if currentBucketSize+dict[key] > preferRows {
currentBucket = append(currentBucket, key)
buckets = append(buckets, currentBucket)
currentBucket = make([]interface{}, 0)
currentBucketSize = 0
} else {
currentBucket = append(currentBucket, key)
currentBucketSize += dict[key]
}
}
buckets = append(buckets, currentBucket)
return buckets
}

func (t *clusteringCompactionTask) generatedScalarPlanByDataNodeMemory(maxRows int64, keys []interface{}, dict map[interface{}]int64) [][]interface{} {
buckets := make([][]interface{}, 0)
currentBucket := make([]interface{}, 0)
var currentBucketSize int64 = 0
maxRows := t.plan.MaxSegmentRows
preferRows := t.plan.PreferSegmentRows
preferRows := int64(float64(maxRows) * 0.7)
for _, key := range keys {
// todo can optimize
if dict[key] > preferRows {
Expand Down Expand Up @@ -1186,6 +1214,33 @@ func (t *clusteringCompactionTask) scalarPlan(dict map[interface{}]int64) [][]in
return buckets
}

func (t *clusteringCompactionTask) switchPolicyForScalarPlan(totalRows int64, keys []interface{}, dict map[interface{}]int64) [][]interface{} {
bufferNumBySegmentMaxRows := totalRows / t.plan.MaxSegmentRows
bufferNumBySegmentMemoryBuffer := t.memoryBufferSize / expectedBinlogSize
log.Info("switchPolicyForScalarPlan", zap.Int64("totalRows", totalRows),
zap.Int64("bufferNumBySegmentMaxRows", bufferNumBySegmentMaxRows),
zap.Int64("bufferNumBySegmentMemoryBuffer", bufferNumBySegmentMemoryBuffer))
if bufferNumBySegmentMemoryBuffer > bufferNumBySegmentMaxRows {
return t.generatedScalarPlan(t.plan.GetMaxSegmentRows(), t.plan.GetPreferSegmentRows(), keys, dict)
}

maxRows := totalRows / bufferNumBySegmentMemoryBuffer
return t.generatedScalarPlan(maxRows, int64(float64(maxRows)*paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSizeRatio.GetAsFloat()), keys, dict)
}

func (t *clusteringCompactionTask) scalarPlan(dict map[interface{}]int64) [][]interface{} {
totalRows := int64(0)
keys := lo.MapToSlice(dict, func(k interface{}, v int64) interface{} {
totalRows += v
return k
})
sort.Slice(keys, func(i, j int) bool {
return storage.NewScalarFieldValue(t.clusteringKeyField.DataType, keys[i]).LE(storage.NewScalarFieldValue(t.clusteringKeyField.DataType, keys[j]))
})

return t.switchPolicyForScalarPlan(totalRows, keys, dict)
}

func (t *clusteringCompactionTask) refreshBufferWriterWithPack(buffer *ClusterBuffer) (bool, error) {
var segmentID int64
var err error
Expand Down
76 changes: 76 additions & 0 deletions internal/datanode/compaction/clustering_compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package compaction
import (
"context"
"fmt"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -232,6 +233,81 @@ func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormal() {
s.Equal(totalRowNum, statsRowNum)
}

func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormalByMemoryLimit() {
schema := genCollectionSchema()
var segmentID int64 = 1001
segWriter, err := NewSegmentWriter(schema, 1000, segmentID, PartitionID, CollectionID)
s.Require().NoError(err)
for i := 0; i < 10240; i++ {
v := storage.Value{
PK: storage.NewInt64PrimaryKey(int64(i)),
Timestamp: int64(tsoutil.ComposeTSByTime(getMilvusBirthday(), 0)),
Value: genRow(int64(i)),
}
err = segWriter.Write(&v)
s.Require().NoError(err)
}
segWriter.FlushAndIsFull()

kvs, fBinlogs, err := serializeWrite(context.TODO(), s.mockAlloc, segWriter)
s.NoError(err)
var one sync.Once
s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, strings []string) ([][]byte, error) {
// 32m, only two buffers can be generated
one.Do(func() {
s.task.memoryBufferSize = 32 * 1024 * 1024
})
return lo.Values(kvs), nil
})

s.plan.SegmentBinlogs = []*datapb.CompactionSegmentBinlogs{
{
SegmentID: segmentID,
FieldBinlogs: lo.Values(fBinlogs),
},
}

s.task.plan.Schema = genCollectionSchema()
s.task.plan.ClusteringKeyField = 100
s.task.plan.PreferSegmentRows = 3000
s.task.plan.MaxSegmentRows = 3000

// 8+8+8+4+7+4*4=51
// 51*1024 = 52224
// writer will automatically flush after 1024 rows.
paramtable.Get().Save(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key, "52223")
defer paramtable.Get().Reset(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key)
paramtable.Get().Save(paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSizeRatio.Key, "1")
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSizeRatio.Key)

compactionResult, err := s.task.Compact()
s.Require().NoError(err)
s.Equal(2, len(s.task.clusterBuffers))
s.Equal(4, len(compactionResult.GetSegments()))
totalBinlogNum := 0
totalRowNum := int64(0)
for _, fb := range compactionResult.GetSegments()[0].GetInsertLogs() {
for _, b := range fb.GetBinlogs() {
totalBinlogNum++
if fb.GetFieldID() == 100 {
totalRowNum += b.GetEntriesNum()
}
}
}
statsBinlogNum := 0
statsRowNum := int64(0)
for _, sb := range compactionResult.GetSegments()[0].GetField2StatslogPaths() {
for _, b := range sb.GetBinlogs() {
statsBinlogNum++
statsRowNum += b.GetEntriesNum()
}
}
s.Equal(3, totalBinlogNum/len(schema.GetFields()))
s.Equal(1, statsBinlogNum)
s.Equal(totalRowNum, statsRowNum)
}

func (s *ClusteringCompactionTaskSuite) TestCheckBuffersAfterCompaction() {
s.Run("no leak", func() {
task := &clusteringCompactionTask{clusterBuffers: []*ClusterBuffer{{}}}
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -4537,7 +4537,7 @@ if this parameter <= 0, will set it as 10`,
Key: "dataNode.clusteringCompaction.memoryBufferRatio",
Version: "2.4.6",
Doc: "The ratio of memory buffer of clustering compaction. Data larger than threshold will be flushed to storage.",
DefaultValue: "0.1",
DefaultValue: "0.3",
PanicIfEmpty: false,
Export: true,
}
Expand Down

0 comments on commit 13af6e7

Please sign in to comment.