Skip to content

Commit

Permalink
fix: Avoid acquire index meta's lock for each segment (#31723) (#31787)
Browse files Browse the repository at this point in the history
issue: #31662 #31409 
pr: #31723

during FilterIndexedSegment in GetRecoveryInfo, it try to acquire index
meta's read lock for every segment. when a collection has thousands of
segments, which may blocked for more than 10 seconds and even longer.
cause `AddSegmentIndex` may also triggered frequently, which try to get
the write lock.

This PR avoid acquire index meta's lock for each segment

Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
weiliu1031 authored Apr 2, 2024
1 parent 17a847a commit 5bfe26f
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 63 deletions.
51 changes: 25 additions & 26 deletions internal/datacoord/index_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/timerecord"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

type indexMeta struct {
Expand Down Expand Up @@ -384,42 +385,40 @@ func (m *indexMeta) GetSegmentIndexState(collID, segmentID UniqueID, indexID Uni
return state
}

func (m *indexMeta) GetSegmentIndexStateOnField(collID, segmentID, fieldID UniqueID) *indexpb.SegmentIndexState {
func (m *indexMeta) GetIndexedSegments(collectionID int64, fieldIDs []UniqueID) []int64 {
m.RLock()
defer m.RUnlock()

state := &indexpb.SegmentIndexState{
SegmentID: segmentID,
State: commonpb.IndexState_IndexStateNone,
FailReason: "",
}
fieldIndexes, ok := m.indexes[collID]
fieldIndexes, ok := m.indexes[collectionID]
if !ok {
state.FailReason = fmt.Sprintf("collection not exist with ID: %d", collID)
return state
return nil
}

indexes, ok := m.segmentIndexes[segmentID]
if !ok {
state.FailReason = fmt.Sprintf("segment index not exist with ID: %d", segmentID)
state.State = commonpb.IndexState_Unissued
return state
}
fieldIDSet := typeutil.NewUniqueSet(fieldIDs...)

for indexID, index := range fieldIndexes {
if index.FieldID == fieldID && !index.IsDeleted {
if segIdx, ok := indexes[indexID]; ok {
state.IndexName = index.IndexName
state.State = segIdx.IndexState
state.FailReason = segIdx.FailReason
return state
checkSegmentState := func(indexes map[int64]*model.SegmentIndex) bool {
indexedFields := 0
for indexID, index := range fieldIndexes {
if !fieldIDSet.Contain(index.FieldID) || index.IsDeleted {
continue
}

if segIdx, ok := indexes[indexID]; ok && segIdx.IndexState == commonpb.IndexState_Finished {
indexedFields += 1
}
state.State = commonpb.IndexState_Unissued
return state
}

return indexedFields == fieldIDSet.Len()
}
state.FailReason = fmt.Sprintf("there is no index on fieldID: %d", fieldID)
return state

ret := make([]int64, 0)
for sid, indexes := range m.segmentIndexes {
if checkSegmentState(indexes) {
ret = append(ret, sid)
}
}

return ret
}

// GetIndexesForCollection gets all indexes info with the specified collection.
Expand Down
19 changes: 7 additions & 12 deletions internal/datacoord/index_meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ func TestMeta_GetSegmentIndexState(t *testing.T) {
})
}

func TestMeta_GetSegmentIndexStateOnField(t *testing.T) {
func TestMeta_GetIndexedSegment(t *testing.T) {
var (
collID = UniqueID(1)
partID = UniqueID(2)
Expand Down Expand Up @@ -614,23 +614,18 @@ func TestMeta_GetSegmentIndexStateOnField(t *testing.T) {
}

t.Run("success", func(t *testing.T) {
state := m.GetSegmentIndexStateOnField(collID, segID, fieldID)
assert.Equal(t, commonpb.IndexState_Finished, state.GetState())
segments := m.GetIndexedSegments(collID, []int64{fieldID})
assert.Len(t, segments, 1)
})

t.Run("no index on field", func(t *testing.T) {
state := m.GetSegmentIndexStateOnField(collID, segID, fieldID+1)
assert.Equal(t, commonpb.IndexState_IndexStateNone, state.GetState())
segments := m.GetIndexedSegments(collID, []int64{fieldID + 1})
assert.Len(t, segments, 0)
})

t.Run("no index", func(t *testing.T) {
state := m.GetSegmentIndexStateOnField(collID+1, segID, fieldID+1)
assert.Equal(t, commonpb.IndexState_IndexStateNone, state.GetState())
})

t.Run("segment not exist", func(t *testing.T) {
state := m.GetSegmentIndexStateOnField(collID, segID+1, fieldID)
assert.Equal(t, commonpb.IndexState_Unissued, state.GetState())
segments := m.GetIndexedSegments(collID+1, []int64{fieldID})
assert.Len(t, segments, 0)
})
}

Expand Down
47 changes: 22 additions & 25 deletions internal/datacoord/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"strings"
"time"

"github.com/samber/lo"
"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
Expand Down Expand Up @@ -70,49 +71,45 @@ func FilterInIndexedSegments(handler Handler, mt *meta, segments ...*SegmentInfo
return nil
}

segmentMap := make(map[int64]*SegmentInfo)
collectionSegments := make(map[int64][]int64)
collectionSegments := lo.GroupBy(segments, func(segment *SegmentInfo) int64 {
return segment.GetCollectionID()
})

vecFieldIDs := make(map[int64][]int64)
for _, segment := range segments {
collectionID := segment.GetCollectionID()
segmentMap[segment.GetID()] = segment
collectionSegments[collectionID] = append(collectionSegments[collectionID], segment.GetID())
}
for collection := range collectionSegments {
ret := make([]*SegmentInfo, 0)
for collection, segmentList := range collectionSegments {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
coll, err := handler.GetCollection(ctx, collection)
cancel()
if err != nil {
log.Warn("failed to get collection schema", zap.Error(err))
continue
}

// get vector field id
vecFieldIDs := make([]int64, 0)
for _, field := range coll.Schema.GetFields() {
if typeutil.IsVectorType(field.GetDataType()) {
vecFieldIDs[collection] = append(vecFieldIDs[collection], field.GetFieldID())
vecFieldIDs = append(vecFieldIDs, field.GetFieldID())
}
}
}

indexedSegments := make([]*SegmentInfo, 0)
for _, segment := range segments {
if !isFlushState(segment.GetState()) && segment.GetState() != commonpb.SegmentState_Dropped {
continue
}
// get indexed segments which finish build index on all vector field
indexed := mt.indexMeta.GetIndexedSegments(collection, vecFieldIDs)
if len(indexed) > 0 {
indexedSet := typeutil.NewUniqueSet(indexed...)
for _, segment := range segmentList {
if !isFlushState(segment.GetState()) && segment.GetState() != commonpb.SegmentState_Dropped {
continue
}

hasUnindexedVecField := false
for _, fieldID := range vecFieldIDs[segment.GetCollectionID()] {
segmentIndexState := mt.indexMeta.GetSegmentIndexStateOnField(segment.GetCollectionID(), segment.GetID(), fieldID)
if segmentIndexState.State != commonpb.IndexState_Finished {
hasUnindexedVecField = true
if indexedSet.Contain(segment.GetID()) {
ret = append(ret, segment)
}
}
}
if !hasUnindexedVecField {
indexedSegments = append(indexedSegments, segment)
}
}

return indexedSegments
return ret
}

func getZeroTime() time.Time {
Expand Down

0 comments on commit 5bfe26f

Please sign in to comment.