Skip to content

Commit

Permalink
Guarantee flushed segments are all indexed (milvus-io#19062)
Browse files Browse the repository at this point in the history
Signed-off-by: yah01 <[email protected]>

Signed-off-by: yah01 <[email protected]>
  • Loading branch information
yah01 authored Sep 16, 2022
1 parent 3998752 commit 11b352c
Show file tree
Hide file tree
Showing 22 changed files with 1,675 additions and 258 deletions.
4 changes: 3 additions & 1 deletion codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,7 @@ ignore:
- "**/*.pb.go"
- "**/*.proto"
- "internal/metastore/db/dbmodel/mocks/.*"
- "internal/mocks/.*"
- "internal/mocks"



18 changes: 14 additions & 4 deletions internal/datacoord/compaction_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/logutil"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -68,16 +69,23 @@ type compactionTrigger struct {
quit chan struct{}
wg sync.WaitGroup
segRefer *SegmentReferenceManager
indexCoord types.IndexCoord
}

func newCompactionTrigger(meta *meta, compactionHandler compactionPlanContext, allocator allocator,
segRefer *SegmentReferenceManager) *compactionTrigger {
func newCompactionTrigger(
meta *meta,
compactionHandler compactionPlanContext,
allocator allocator,
segRefer *SegmentReferenceManager,
indexCoord types.IndexCoord,
) *compactionTrigger {
return &compactionTrigger{
meta: meta,
allocator: allocator,
signals: make(chan *compactionSignal, 100),
compactionHandler: compactionHandler,
segRefer: segRefer,
indexCoord: indexCoord,
}
}

Expand Down Expand Up @@ -127,7 +135,7 @@ func (t *compactionTrigger) startGlobalCompactionLoop() {
return
case <-t.globalTrigger.C:
cctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
ct, err := getCompactTime(cctx, t.allocator)
ct, err := GetCompactTime(cctx, t.allocator)
if err != nil {
log.Warn("unbale to get compaction time", zap.Error(err))
cancel()
Expand Down Expand Up @@ -228,6 +236,7 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) {
return (signal.collectionID == 0 || segment.CollectionID == signal.collectionID) &&
isSegmentHealthy(segment) &&
isFlush(segment) &&
IsParentDropped(t.meta, segment) &&
!segment.isCompacting // not compacting now
}) // m is list of chanPartSegments, which is channel-partition organized segments
for _, group := range m {
Expand Down Expand Up @@ -460,11 +469,12 @@ func (t *compactionTrigger) getCandidateSegments(channel string, partitionID Uni
var res []*SegmentInfo
for _, s := range segments {
if !isSegmentHealthy(s) || !isFlush(s) || s.GetInsertChannel() != channel ||
s.GetPartitionID() != partitionID || s.isCompacting {
s.GetPartitionID() != partitionID || !IsParentDropped(t.meta, s) || s.isCompacting {
continue
}
res = append(res, s)
}

return res
}

Expand Down
110 changes: 106 additions & 4 deletions internal/datacoord/compaction_trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ import (
"testing"
"time"

"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -83,6 +85,7 @@ func Test_compactionTrigger_force(t *testing.T) {
compactTime *compactTime
}
Params.Init()
vecFieldID := int64(201)
tests := []struct {
name string
fields fields
Expand Down Expand Up @@ -150,6 +153,19 @@ func Test_compactionTrigger_force(t *testing.T) {
},
},
},
collections: map[int64]*datapb.CollectionInfo{
2: {
ID: 2,
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: vecFieldID,
DataType: schemapb.DataType_FloatVector,
},
},
},
},
},
},
newMockAllocator(),
nil,
Expand Down Expand Up @@ -214,13 +230,19 @@ func Test_compactionTrigger_force(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
indexCoord := mocks.NewMockIndexCoord(t)
segmentIDs := make([]int64, 0)
for _, segment := range tt.fields.meta.segments.GetSegments() {
segmentIDs = append(segmentIDs, segment.GetID())
}
tr := &compactionTrigger{
meta: tt.fields.meta,
allocator: tt.fields.allocator,
signals: tt.fields.signals,
compactionHandler: tt.fields.compactionHandler,
globalTrigger: tt.fields.globalTrigger,
segRefer: tt.fields.segRefer,
indexCoord: indexCoord,
}
_, err := tr.forceTriggerCompaction(tt.args.collectionID, tt.args.compactTime)
assert.Equal(t, tt.wantErr, err != nil)
Expand All @@ -246,7 +268,7 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) {
compactTime *compactTime
}
Params.Init()

vecFieldID := int64(201)
segmentInfos := &SegmentsInfo{
segments: make(map[UniqueID]*SegmentInfo),
}
Expand Down Expand Up @@ -292,6 +314,19 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) {
fields{
&meta{
segments: segmentInfos,
collections: map[int64]*datapb.CollectionInfo{
2: {
ID: 2,
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: vecFieldID,
DataType: schemapb.DataType_FloatVector,
},
},
},
},
},
},
newMockAllocator(),
nil,
Expand Down Expand Up @@ -355,13 +390,20 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
indexCoord := mocks.NewMockIndexCoord(t)
segmentIDs := make([]int64, 0)
for _, segment := range tt.fields.meta.segments.GetSegments() {
segmentIDs = append(segmentIDs, segment.GetID())
}

tr := &compactionTrigger{
meta: tt.fields.meta,
allocator: tt.fields.allocator,
signals: tt.fields.signals,
compactionHandler: tt.fields.compactionHandler,
globalTrigger: tt.fields.globalTrigger,
segRefer: &SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}},
indexCoord: indexCoord,
}
_, err := tr.forceTriggerCompaction(tt.args.collectionID, tt.args.compactTime)
assert.Equal(t, tt.wantErr, err != nil)
Expand Down Expand Up @@ -397,6 +439,7 @@ func Test_compactionTrigger_noplan(t *testing.T) {
compactTime *compactTime
}
Params.Init()
vecFieldID := int64(201)
tests := []struct {
name string
fields fields
Expand Down Expand Up @@ -496,6 +539,19 @@ func Test_compactionTrigger_noplan(t *testing.T) {
},
},
},
collections: map[int64]*datapb.CollectionInfo{
2: {
ID: 2,
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: vecFieldID,
DataType: schemapb.DataType_FloatVector,
},
},
},
},
},
},
newMockAllocator(),
make(chan *compactionSignal, 1),
Expand All @@ -512,13 +568,19 @@ func Test_compactionTrigger_noplan(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
indexCoord := mocks.NewMockIndexCoord(t)
segmentIDs := make([]int64, 0)
for _, segment := range tt.fields.meta.segments.GetSegments() {
segmentIDs = append(segmentIDs, segment.GetID())
}
tr := &compactionTrigger{
meta: tt.fields.meta,
allocator: tt.fields.allocator,
signals: tt.fields.signals,
compactionHandler: tt.fields.compactionHandler,
globalTrigger: tt.fields.globalTrigger,
segRefer: &SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}},
indexCoord: indexCoord,
}
tr.start()
defer tr.stop()
Expand Down Expand Up @@ -550,6 +612,7 @@ func Test_compactionTrigger_smallfiles(t *testing.T) {
compactTime *compactTime
}
Params.Init()
vecFieldID := int64(201)
tests := []struct {
name string
fields fields
Expand Down Expand Up @@ -670,6 +733,19 @@ func Test_compactionTrigger_smallfiles(t *testing.T) {
},
},
},
collections: map[int64]*datapb.CollectionInfo{
2: {
ID: 2,
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: vecFieldID,
DataType: schemapb.DataType_FloatVector,
},
},
},
},
},
},
newMockAllocator(),
make(chan *compactionSignal, 1),
Expand All @@ -686,13 +762,19 @@ func Test_compactionTrigger_smallfiles(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
indexCoord := mocks.NewMockIndexCoord(t)
segmentIDs := make([]int64, 0)
for _, segment := range tt.fields.meta.segments.GetSegments() {
segmentIDs = append(segmentIDs, segment.GetID())
}
tr := &compactionTrigger{
meta: tt.fields.meta,
allocator: tt.fields.allocator,
signals: tt.fields.signals,
compactionHandler: tt.fields.compactionHandler,
globalTrigger: tt.fields.globalTrigger,
segRefer: &SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}},
indexCoord: indexCoord,
}
tr.start()
defer tr.stop()
Expand Down Expand Up @@ -739,6 +821,7 @@ func Test_compactionTrigger_noplan_random_size(t *testing.T) {
10, 10, 10, 10, 10, 10, 10, 10, 10, 10,
}

vecFieldID := int64(201)
for i := UniqueID(0); i < 50; i++ {
info := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
Expand Down Expand Up @@ -774,6 +857,19 @@ func Test_compactionTrigger_noplan_random_size(t *testing.T) {
fields{
&meta{
segments: segmentInfos,
collections: map[int64]*datapb.CollectionInfo{
2: {
ID: 2,
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: vecFieldID,
DataType: schemapb.DataType_FloatVector,
},
},
},
},
},
},
newMockAllocator(),
make(chan *compactionSignal, 1),
Expand All @@ -790,13 +886,19 @@ func Test_compactionTrigger_noplan_random_size(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
indexCoord := mocks.NewMockIndexCoord(t)
segmentIDs := make([]int64, 0)
for _, segment := range tt.fields.meta.segments.GetSegments() {
segmentIDs = append(segmentIDs, segment.GetID())
}
tr := &compactionTrigger{
meta: tt.fields.meta,
allocator: tt.fields.allocator,
signals: tt.fields.signals,
compactionHandler: tt.fields.compactionHandler,
globalTrigger: tt.fields.globalTrigger,
segRefer: &SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}},
indexCoord: indexCoord,
}
tr.start()
defer tr.stop()
Expand Down Expand Up @@ -840,7 +942,7 @@ func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) {
Params.Init()

trigger := newCompactionTrigger(&meta{}, &compactionPlanHandler{}, newMockAllocator(),
&SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}})
&SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}, nil)

// Test too many files.
var binlogs []*datapb.FieldBinlog
Expand Down Expand Up @@ -973,7 +1075,7 @@ func Test_newCompactionTrigger(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := newCompactionTrigger(tt.args.meta, tt.args.compactionHandler, tt.args.allocator,
&SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}})
&SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}, nil)
assert.Equal(t, tt.args.meta, got.meta)
assert.Equal(t, tt.args.compactionHandler, got.compactionHandler)
assert.Equal(t, tt.args.allocator, got.allocator)
Expand All @@ -984,7 +1086,7 @@ func Test_newCompactionTrigger(t *testing.T) {
func Test_handleSignal(t *testing.T) {

got := newCompactionTrigger(&meta{segments: NewSegmentsInfo()}, &compactionPlanHandler{}, newMockAllocator(),
&SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}})
&SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}, nil)
signal := &compactionSignal{
segmentID: 1,
}
Expand Down
Loading

0 comments on commit 11b352c

Please sign in to comment.