From 11b352c3aeaab252f331698f952a354ed910c1a3 Mon Sep 17 00:00:00 2001 From: yah01 Date: Fri, 16 Sep 2022 11:32:48 +0800 Subject: [PATCH] Guarantee flushed segments are all indexed (#19062) Signed-off-by: yah01 Signed-off-by: yah01 --- codecov.yml | 4 +- internal/datacoord/compaction_trigger.go | 18 +- internal/datacoord/compaction_trigger_test.go | 110 ++- internal/datacoord/garbage_collector.go | 56 +- internal/datacoord/garbage_collector_test.go | 19 +- internal/datacoord/handler.go | 70 +- internal/datacoord/meta.go | 43 ++ internal/datacoord/server.go | 9 +- internal/datacoord/server_test.go | 103 ++- internal/datacoord/services.go | 93 ++- internal/datacoord/util.go | 105 ++- internal/datacoord/util_test.go | 68 +- internal/distributed/datacoord/service.go | 23 +- .../distributed/datacoord/service_test.go | 9 + internal/mocks/mock_indexcoord.go | 704 ++++++++++++++++++ internal/querycoord/global_meta_broker.go | 5 - internal/querycoord/task_test.go | 2 + internal/types/types.go | 2 + .../testcases/test_collection.py | 4 +- .../testcases/test_compaction.py | 109 ++- tests/python_client/testcases/test_delete.py | 371 ++++++--- tests/python_client/testcases/test_utility.py | 6 +- 22 files changed, 1675 insertions(+), 258 deletions(-) create mode 100644 internal/mocks/mock_indexcoord.go diff --git a/codecov.yml b/codecov.yml index e280bb73e5cda..2ddb42494dad8 100644 --- a/codecov.yml +++ b/codecov.yml @@ -42,5 +42,7 @@ ignore: - "**/*.pb.go" - "**/*.proto" - "internal/metastore/db/dbmodel/mocks/.*" - - "internal/mocks/.*" + - "internal/mocks" + + diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index 5a5f5f63ab58a..04089bcfc1c79 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -25,6 +25,7 @@ import ( "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/logutil" "go.uber.org/zap" ) @@ -68,16 +69,23 @@ type compactionTrigger struct { quit chan struct{} wg sync.WaitGroup segRefer *SegmentReferenceManager + indexCoord types.IndexCoord } -func newCompactionTrigger(meta *meta, compactionHandler compactionPlanContext, allocator allocator, - segRefer *SegmentReferenceManager) *compactionTrigger { +func newCompactionTrigger( + meta *meta, + compactionHandler compactionPlanContext, + allocator allocator, + segRefer *SegmentReferenceManager, + indexCoord types.IndexCoord, +) *compactionTrigger { return &compactionTrigger{ meta: meta, allocator: allocator, signals: make(chan *compactionSignal, 100), compactionHandler: compactionHandler, segRefer: segRefer, + indexCoord: indexCoord, } } @@ -127,7 +135,7 @@ func (t *compactionTrigger) startGlobalCompactionLoop() { return case <-t.globalTrigger.C: cctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - ct, err := getCompactTime(cctx, t.allocator) + ct, err := GetCompactTime(cctx, t.allocator) if err != nil { log.Warn("unbale to get compaction time", zap.Error(err)) cancel() @@ -228,6 +236,7 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) { return (signal.collectionID == 0 || segment.CollectionID == signal.collectionID) && isSegmentHealthy(segment) && isFlush(segment) && + IsParentDropped(t.meta, segment) && !segment.isCompacting // not compacting now }) // m is list of chanPartSegments, which is channel-partition organized segments for _, group := range m { @@ -460,11 +469,12 @@ func (t *compactionTrigger) getCandidateSegments(channel string, partitionID Uni var res []*SegmentInfo for _, s := range segments { if !isSegmentHealthy(s) || !isFlush(s) || s.GetInsertChannel() != channel || - s.GetPartitionID() != partitionID || s.isCompacting { + s.GetPartitionID() != partitionID || !IsParentDropped(t.meta, s) || s.isCompacting { continue } res = append(res, s) } + return res } diff --git a/internal/datacoord/compaction_trigger_test.go b/internal/datacoord/compaction_trigger_test.go index c82b693938b17..16dbe29494faa 100644 --- a/internal/datacoord/compaction_trigger_test.go +++ b/internal/datacoord/compaction_trigger_test.go @@ -22,8 +22,10 @@ import ( "testing" "time" + "github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/schemapb" "github.com/stretchr/testify/assert" ) @@ -83,6 +85,7 @@ func Test_compactionTrigger_force(t *testing.T) { compactTime *compactTime } Params.Init() + vecFieldID := int64(201) tests := []struct { name string fields fields @@ -150,6 +153,19 @@ func Test_compactionTrigger_force(t *testing.T) { }, }, }, + collections: map[int64]*datapb.CollectionInfo{ + 2: { + ID: 2, + Schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: vecFieldID, + DataType: schemapb.DataType_FloatVector, + }, + }, + }, + }, + }, }, newMockAllocator(), nil, @@ -214,6 +230,11 @@ func Test_compactionTrigger_force(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + indexCoord := mocks.NewMockIndexCoord(t) + segmentIDs := make([]int64, 0) + for _, segment := range tt.fields.meta.segments.GetSegments() { + segmentIDs = append(segmentIDs, segment.GetID()) + } tr := &compactionTrigger{ meta: tt.fields.meta, allocator: tt.fields.allocator, @@ -221,6 +242,7 @@ func Test_compactionTrigger_force(t *testing.T) { compactionHandler: tt.fields.compactionHandler, globalTrigger: tt.fields.globalTrigger, segRefer: tt.fields.segRefer, + indexCoord: indexCoord, } _, err := tr.forceTriggerCompaction(tt.args.collectionID, tt.args.compactTime) assert.Equal(t, tt.wantErr, err != nil) @@ -246,7 +268,7 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) { compactTime *compactTime } Params.Init() - + vecFieldID := int64(201) segmentInfos := &SegmentsInfo{ segments: make(map[UniqueID]*SegmentInfo), } @@ -292,6 +314,19 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) { fields{ &meta{ segments: segmentInfos, + collections: map[int64]*datapb.CollectionInfo{ + 2: { + ID: 2, + Schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: vecFieldID, + DataType: schemapb.DataType_FloatVector, + }, + }, + }, + }, + }, }, newMockAllocator(), nil, @@ -355,6 +390,12 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + indexCoord := mocks.NewMockIndexCoord(t) + segmentIDs := make([]int64, 0) + for _, segment := range tt.fields.meta.segments.GetSegments() { + segmentIDs = append(segmentIDs, segment.GetID()) + } + tr := &compactionTrigger{ meta: tt.fields.meta, allocator: tt.fields.allocator, @@ -362,6 +403,7 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) { compactionHandler: tt.fields.compactionHandler, globalTrigger: tt.fields.globalTrigger, segRefer: &SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}, + indexCoord: indexCoord, } _, err := tr.forceTriggerCompaction(tt.args.collectionID, tt.args.compactTime) assert.Equal(t, tt.wantErr, err != nil) @@ -397,6 +439,7 @@ func Test_compactionTrigger_noplan(t *testing.T) { compactTime *compactTime } Params.Init() + vecFieldID := int64(201) tests := []struct { name string fields fields @@ -496,6 +539,19 @@ func Test_compactionTrigger_noplan(t *testing.T) { }, }, }, + collections: map[int64]*datapb.CollectionInfo{ + 2: { + ID: 2, + Schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: vecFieldID, + DataType: schemapb.DataType_FloatVector, + }, + }, + }, + }, + }, }, newMockAllocator(), make(chan *compactionSignal, 1), @@ -512,6 +568,11 @@ func Test_compactionTrigger_noplan(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + indexCoord := mocks.NewMockIndexCoord(t) + segmentIDs := make([]int64, 0) + for _, segment := range tt.fields.meta.segments.GetSegments() { + segmentIDs = append(segmentIDs, segment.GetID()) + } tr := &compactionTrigger{ meta: tt.fields.meta, allocator: tt.fields.allocator, @@ -519,6 +580,7 @@ func Test_compactionTrigger_noplan(t *testing.T) { compactionHandler: tt.fields.compactionHandler, globalTrigger: tt.fields.globalTrigger, segRefer: &SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}, + indexCoord: indexCoord, } tr.start() defer tr.stop() @@ -550,6 +612,7 @@ func Test_compactionTrigger_smallfiles(t *testing.T) { compactTime *compactTime } Params.Init() + vecFieldID := int64(201) tests := []struct { name string fields fields @@ -670,6 +733,19 @@ func Test_compactionTrigger_smallfiles(t *testing.T) { }, }, }, + collections: map[int64]*datapb.CollectionInfo{ + 2: { + ID: 2, + Schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: vecFieldID, + DataType: schemapb.DataType_FloatVector, + }, + }, + }, + }, + }, }, newMockAllocator(), make(chan *compactionSignal, 1), @@ -686,6 +762,11 @@ func Test_compactionTrigger_smallfiles(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + indexCoord := mocks.NewMockIndexCoord(t) + segmentIDs := make([]int64, 0) + for _, segment := range tt.fields.meta.segments.GetSegments() { + segmentIDs = append(segmentIDs, segment.GetID()) + } tr := &compactionTrigger{ meta: tt.fields.meta, allocator: tt.fields.allocator, @@ -693,6 +774,7 @@ func Test_compactionTrigger_smallfiles(t *testing.T) { compactionHandler: tt.fields.compactionHandler, globalTrigger: tt.fields.globalTrigger, segRefer: &SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}, + indexCoord: indexCoord, } tr.start() defer tr.stop() @@ -739,6 +821,7 @@ func Test_compactionTrigger_noplan_random_size(t *testing.T) { 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, } + vecFieldID := int64(201) for i := UniqueID(0); i < 50; i++ { info := &SegmentInfo{ SegmentInfo: &datapb.SegmentInfo{ @@ -774,6 +857,19 @@ func Test_compactionTrigger_noplan_random_size(t *testing.T) { fields{ &meta{ segments: segmentInfos, + collections: map[int64]*datapb.CollectionInfo{ + 2: { + ID: 2, + Schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: vecFieldID, + DataType: schemapb.DataType_FloatVector, + }, + }, + }, + }, + }, }, newMockAllocator(), make(chan *compactionSignal, 1), @@ -790,6 +886,11 @@ func Test_compactionTrigger_noplan_random_size(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + indexCoord := mocks.NewMockIndexCoord(t) + segmentIDs := make([]int64, 0) + for _, segment := range tt.fields.meta.segments.GetSegments() { + segmentIDs = append(segmentIDs, segment.GetID()) + } tr := &compactionTrigger{ meta: tt.fields.meta, allocator: tt.fields.allocator, @@ -797,6 +898,7 @@ func Test_compactionTrigger_noplan_random_size(t *testing.T) { compactionHandler: tt.fields.compactionHandler, globalTrigger: tt.fields.globalTrigger, segRefer: &SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}, + indexCoord: indexCoord, } tr.start() defer tr.stop() @@ -840,7 +942,7 @@ func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) { Params.Init() trigger := newCompactionTrigger(&meta{}, &compactionPlanHandler{}, newMockAllocator(), - &SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}) + &SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}, nil) // Test too many files. var binlogs []*datapb.FieldBinlog @@ -973,7 +1075,7 @@ func Test_newCompactionTrigger(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { got := newCompactionTrigger(tt.args.meta, tt.args.compactionHandler, tt.args.allocator, - &SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}) + &SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}, nil) assert.Equal(t, tt.args.meta, got.meta) assert.Equal(t, tt.args.compactionHandler, got.compactionHandler) assert.Equal(t, tt.args.allocator, got.allocator) @@ -984,7 +1086,7 @@ func Test_newCompactionTrigger(t *testing.T) { func Test_handleSignal(t *testing.T) { got := newCompactionTrigger(&meta{segments: NewSegmentsInfo()}, &compactionPlanHandler{}, newMockAllocator(), - &SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}) + &SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}, nil) signal := &compactionSignal{ segmentID: 1, } diff --git a/internal/datacoord/garbage_collector.go b/internal/datacoord/garbage_collector.go index e4e26caf1f25e..ac308fd1e02a1 100644 --- a/internal/datacoord/garbage_collector.go +++ b/internal/datacoord/garbage_collector.go @@ -22,6 +22,9 @@ import ( "time" "github.com/milvus-io/milvus/internal/common" + "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/internal/util/typeutil" + "github.com/samber/lo" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/commonpb" @@ -51,9 +54,10 @@ type GcOption struct { // garbageCollector handles garbage files in object storage // which could be dropped collection remanent or data node failure traces type garbageCollector struct { - option GcOption - meta *meta - segRefer *SegmentReferenceManager + option GcOption + meta *meta + segRefer *SegmentReferenceManager + indexCoord types.IndexCoord startOnce sync.Once stopOnce sync.Once @@ -62,14 +66,18 @@ type garbageCollector struct { } // newGarbageCollector create garbage collector with meta and option -func newGarbageCollector(meta *meta, segRefer *SegmentReferenceManager, opt GcOption) *garbageCollector { +func newGarbageCollector(meta *meta, + segRefer *SegmentReferenceManager, + indexCoord types.IndexCoord, + opt GcOption) *garbageCollector { log.Info("GC with option", zap.Bool("enabled", opt.enabled), zap.Duration("interval", opt.checkInterval), zap.Duration("missingTolerance", opt.missingTolerance), zap.Duration("dropTolerance", opt.dropTolerance)) return &garbageCollector{ - meta: meta, - segRefer: segRefer, - option: opt, - closeCh: make(chan struct{}), + meta: meta, + segRefer: segRefer, + indexCoord: indexCoord, + option: opt, + closeCh: make(chan struct{}), } } @@ -170,14 +178,40 @@ func (gc *garbageCollector) scan() { } func (gc *garbageCollector) clearEtcd() { - drops := gc.meta.SelectSegments(func(segment *SegmentInfo) bool { - return segment.GetState() == commonpb.SegmentState_Dropped && !gc.segRefer.HasSegmentLock(segment.ID) - }) + all := gc.meta.SelectSegments(func(si *SegmentInfo) bool { return true }) + drops := make(map[int64]*SegmentInfo, 0) + compactTo := make(map[int64]*SegmentInfo) + for _, segment := range all { + if segment.GetState() == commonpb.SegmentState_Dropped && !gc.segRefer.HasSegmentLock(segment.ID) { + drops[segment.GetID()] = segment + continue + } + for _, from := range segment.GetCompactionFrom() { + compactTo[from] = segment + } + } + + droppedCompactTo := make(map[*SegmentInfo]struct{}) + for id := range drops { + if to, ok := compactTo[id]; ok { + droppedCompactTo[to] = struct{}{} + } + } + indexedSegments := FilterInIndexedSegments(gc.meta, gc.indexCoord, lo.Keys(droppedCompactTo)...) + indexedSet := make(typeutil.UniqueSet) + for _, segment := range indexedSegments { + indexedSet.Insert(segment.GetID()) + } for _, sinfo := range drops { if !gc.isExpire(sinfo.GetDroppedAt()) { continue } + // For compact A, B -> C, don't GC A or B if C is not indexed, + // guarantee replacing A, B with C won't downgrade performance + if to, ok := compactTo[sinfo.GetID()]; ok && !indexedSet.Contain(to.GetID()) { + continue + } logs := getLogs(sinfo) if gc.removeLogs(logs) { _ = gc.meta.DropSegment(sinfo.GetID()) diff --git a/internal/datacoord/garbage_collector_test.go b/internal/datacoord/garbage_collector_test.go index 433b1bd59cf0e..3bfcea408d3e2 100644 --- a/internal/datacoord/garbage_collector_test.go +++ b/internal/datacoord/garbage_collector_test.go @@ -26,6 +26,7 @@ import ( "time" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" + "github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/storage" @@ -55,8 +56,10 @@ func Test_garbageCollector_basic(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, segRefer) + indexCoord := mocks.NewMockIndexCoord(t) + t.Run("normal gc", func(t *testing.T) { - gc := newGarbageCollector(meta, segRefer, GcOption{ + gc := newGarbageCollector(meta, segRefer, indexCoord, GcOption{ cli: cli, enabled: true, checkInterval: time.Millisecond * 10, @@ -73,7 +76,7 @@ func Test_garbageCollector_basic(t *testing.T) { }) t.Run("with nil cli", func(t *testing.T) { - gc := newGarbageCollector(meta, segRefer, GcOption{ + gc := newGarbageCollector(meta, segRefer, indexCoord, GcOption{ cli: nil, enabled: true, checkInterval: time.Millisecond * 10, @@ -118,6 +121,8 @@ func Test_garbageCollector_scan(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, segRefer) + indexCoord := mocks.NewMockIndexCoord(t) + t.Run("key is reference", func(t *testing.T) { segReferManager := &SegmentReferenceManager{ etcdKV: etcdKV, @@ -134,7 +139,7 @@ func Test_garbageCollector_scan(t *testing.T) { 2: 1, }, } - gc := newGarbageCollector(meta, segRefer, GcOption{ + gc := newGarbageCollector(meta, segRefer, indexCoord, GcOption{ cli: cli, enabled: true, checkInterval: time.Minute * 30, @@ -156,7 +161,7 @@ func Test_garbageCollector_scan(t *testing.T) { }) t.Run("missing all but save tolerance", func(t *testing.T) { - gc := newGarbageCollector(meta, segRefer, GcOption{ + gc := newGarbageCollector(meta, segRefer, indexCoord, GcOption{ cli: cli, enabled: true, checkInterval: time.Minute * 30, @@ -182,7 +187,7 @@ func Test_garbageCollector_scan(t *testing.T) { err = meta.AddSegment(segment) require.NoError(t, err) - gc := newGarbageCollector(meta, segRefer, GcOption{ + gc := newGarbageCollector(meta, segRefer, indexCoord, GcOption{ cli: cli, enabled: true, checkInterval: time.Minute * 30, @@ -211,7 +216,7 @@ func Test_garbageCollector_scan(t *testing.T) { err = meta.AddSegment(segment) require.NoError(t, err) - gc := newGarbageCollector(meta, segRefer, GcOption{ + gc := newGarbageCollector(meta, segRefer, indexCoord, GcOption{ cli: cli, enabled: true, checkInterval: time.Minute * 30, @@ -228,7 +233,7 @@ func Test_garbageCollector_scan(t *testing.T) { gc.close() }) t.Run("missing gc all", func(t *testing.T) { - gc := newGarbageCollector(meta, segRefer, GcOption{ + gc := newGarbageCollector(meta, segRefer, indexCoord, GcOption{ cli: cli, enabled: true, checkInterval: time.Minute * 30, diff --git a/internal/datacoord/handler.go b/internal/datacoord/handler.go index df78d7def2914..5e19c7b5f3407 100644 --- a/internal/datacoord/handler.go +++ b/internal/datacoord/handler.go @@ -24,6 +24,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/util/funcutil" + "github.com/milvus-io/milvus/internal/util/typeutil" "go.uber.org/zap" ) @@ -45,49 +46,84 @@ func newServerHandler(s *Server) *ServerHandler { return &ServerHandler{s: s} } -// GetVChanPositions gets vchannel latest postitions with provided dml channel names +// GetVChanPositions gets vchannel latest postitions with provided dml channel names, +// we expect QueryCoord gets the indexed segments to load, so the flushed segments below are actually the indexed segments, +// the unflushed segments are actually the segments without index, even they are flushed. func (h *ServerHandler) GetVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo { // cannot use GetSegmentsByChannel since dropped segments are needed here segments := h.s.meta.SelectSegments(func(s *SegmentInfo) bool { return s.InsertChannel == channel.Name }) + segmentInfos := make(map[int64]*SegmentInfo) + indexedSegments := FilterInIndexedSegments(h.s.meta, h.s.indexCoord, segments...) + indexed := make(typeutil.UniqueSet) + for _, segment := range indexedSegments { + indexed.Insert(segment.GetID()) + } log.Info("GetSegmentsByChannel", zap.Any("collectionID", channel.CollectionID), zap.Any("channel", channel), zap.Any("numOfSegments", len(segments)), ) - var flushedIds []int64 - var unflushedIds []int64 - var droppedIds []int64 - var seekPosition *internalpb.MsgPosition + var ( + flushedIds = make(typeutil.UniqueSet) + unflushedIds = make(typeutil.UniqueSet) + droppedIds = make(typeutil.UniqueSet) + seekPosition *internalpb.MsgPosition + ) for _, s := range segments { if (partitionID > allPartitionID && s.PartitionID != partitionID) || (s.GetStartPosition() == nil && s.GetDmlPosition() == nil) { continue } - + segmentInfos[s.GetID()] = s if s.GetState() == commonpb.SegmentState_Dropped { - droppedIds = append(droppedIds, trimSegmentInfo(s.SegmentInfo).GetID()) - continue + droppedIds.Insert(s.GetID()) + } else if indexed.Contain(s.GetID()) { + flushedIds.Insert(s.GetID()) + } else { + unflushedIds.Insert(s.GetID()) + } + } + for id := range unflushedIds { + // Indexed segments are compacted to a raw segment, + // replace it with the indexed ones + if !indexed.Contain(id) && + len(segmentInfos[id].GetCompactionFrom()) > 0 && + indexed.Contain(segmentInfos[id].GetCompactionFrom()...) { + flushedIds.Insert(segmentInfos[id].GetCompactionFrom()...) + unflushedIds.Remove(id) + droppedIds.Remove(segmentInfos[id].GetCompactionFrom()...) } + } - if s.GetState() == commonpb.SegmentState_Flushing || s.GetState() == commonpb.SegmentState_Flushed { - flushedIds = append(flushedIds, trimSegmentInfo(s.SegmentInfo).GetID()) + for id := range flushedIds { + var segmentPosition *internalpb.MsgPosition + segment := segmentInfos[id] + if segment.GetDmlPosition() != nil { + segmentPosition = segment.GetDmlPosition() } else { - unflushedIds = append(unflushedIds, s.SegmentInfo.GetID()) + segmentPosition = segment.GetStartPosition() } + if seekPosition == nil || segmentPosition.Timestamp < seekPosition.Timestamp { + seekPosition = segmentPosition + } + } + for id := range unflushedIds { var segmentPosition *internalpb.MsgPosition - if s.GetDmlPosition() != nil { - segmentPosition = s.GetDmlPosition() + segment := segmentInfos[id] + if segment.GetDmlPosition() != nil { + segmentPosition = segment.GetDmlPosition() } else { - segmentPosition = s.GetStartPosition() + segmentPosition = segment.GetStartPosition() } if seekPosition == nil || segmentPosition.Timestamp < seekPosition.Timestamp { seekPosition = segmentPosition } } + // use collection start position when segment position is not found if seekPosition == nil { if channel.StartPositions == nil { @@ -105,9 +141,9 @@ func (h *ServerHandler) GetVChanPositions(channel *channel, partitionID UniqueID CollectionID: channel.CollectionID, ChannelName: channel.Name, SeekPosition: seekPosition, - FlushedSegmentIds: flushedIds, - UnflushedSegmentIds: unflushedIds, - DroppedSegmentIds: droppedIds, + FlushedSegmentIds: flushedIds.Collect(), + UnflushedSegmentIds: unflushedIds.Collect(), + DroppedSegmentIds: droppedIds.Collect(), } } diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index 00068ce693290..49a6e51fcc37c 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -196,6 +196,14 @@ func (m *meta) GetSegment(segID UniqueID) *SegmentInfo { return nil } +// GetSegment returns segment info with provided id +// if not segment is found, nil will be returned +func (m *meta) GetSegmentUnsafe(segID UniqueID) *SegmentInfo { + m.RLock() + defer m.RUnlock() + return m.segments.GetSegment(segID) +} + // GetAllSegment returns segment info with provided id // different from GetSegment, this will return unhealthy segment as well func (m *meta) GetAllSegment(segID UniqueID) *SegmentInfo { @@ -590,6 +598,23 @@ func (m *meta) GetSegmentsIDOfCollection(collectionID UniqueID) []UniqueID { return ret } +// GetSegmentsIDOfCollection returns all segment ids which collection equals to provided `collectionID` +func (m *meta) GetSegmentsIDOfCollectionWithDropped(collectionID UniqueID) []UniqueID { + m.RLock() + defer m.RUnlock() + ret := make([]UniqueID, 0) + segments := m.segments.GetSegments() + for _, segment := range segments { + if segment != nil && + segment.GetState() != commonpb.SegmentState_SegmentStateNone && + segment.GetState() != commonpb.SegmentState_NotExist && + segment.CollectionID == collectionID { + ret = append(ret, segment.ID) + } + } + return ret +} + // GetSegmentsIDOfPartition returns all segments ids which collection & partition equals to provided `collectionID`, `partitionID` func (m *meta) GetSegmentsIDOfPartition(collectionID, partitionID UniqueID) []UniqueID { m.RLock() @@ -604,6 +629,24 @@ func (m *meta) GetSegmentsIDOfPartition(collectionID, partitionID UniqueID) []Un return ret } +// GetSegmentsIDOfPartition returns all segments ids which collection & partition equals to provided `collectionID`, `partitionID` +func (m *meta) GetSegmentsIDOfPartitionWithDropped(collectionID, partitionID UniqueID) []UniqueID { + m.RLock() + defer m.RUnlock() + ret := make([]UniqueID, 0) + segments := m.segments.GetSegments() + for _, segment := range segments { + if segment != nil && + segment.GetState() != commonpb.SegmentState_SegmentStateNone && + segment.GetState() != commonpb.SegmentState_NotExist && + segment.CollectionID == collectionID && + segment.PartitionID == partitionID { + ret = append(ret, segment.ID) + } + } + return ret +} + // GetNumRowsOfPartition returns row count of segments belongs to provided collection & partition func (m *meta) GetNumRowsOfPartition(collectionID UniqueID, partitionID UniqueID) int64 { m.RLock() diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 21f9a782b26d3..4291c10850236 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -137,6 +137,7 @@ type Server struct { dataNodeCreator dataNodeCreatorFunc rootCoordClientCreator rootCoordCreatorFunc + indexCoord types.IndexCoord segReferManager *SegmentReferenceManager } @@ -335,6 +336,10 @@ func (s *Server) SetEtcdClient(client *clientv3.Client) { s.etcdCli = client } +func (s *Server) SetIndexCoord(indexCoord types.IndexCoord) { + s.indexCoord = indexCoord +} + func (s *Server) createCompactionHandler() { s.compactionHandler = newCompactionPlanHandler(s.sessionManager, s.channelManager, s.meta, s.allocator, s.flushCh, s.segReferManager) s.compactionHandler.start() @@ -345,7 +350,7 @@ func (s *Server) stopCompactionHandler() { } func (s *Server) createCompactionTrigger() { - s.compactionTrigger = newCompactionTrigger(s.meta, s.compactionHandler, s.allocator, s.segReferManager) + s.compactionTrigger = newCompactionTrigger(s.meta, s.compactionHandler, s.allocator, s.segReferManager, s.indexCoord) s.compactionTrigger.start() } @@ -384,7 +389,7 @@ func (s *Server) initGarbageCollection() error { log.Info("local chunk manager init success") } - s.garbageCollector = newGarbageCollector(s.meta, s.segReferManager, GcOption{ + s.garbageCollector = newGarbageCollector(s.meta, s.segReferManager, s.indexCoord, GcOption{ cli: cli, enabled: Params.DataCoordCfg.EnableGarbageCollection, rootPath: Params.MinioCfg.RootPath, diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 9f22ca7b97ad6..e6f13a5c874f0 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -30,6 +30,7 @@ import ( "testing" "time" + "github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/typeutil" "github.com/minio/minio-go/v7" @@ -47,8 +48,10 @@ import ( "github.com/milvus-io/milvus/internal/mq/msgstream" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/milvuspb" + "github.com/milvus-io/milvus/internal/proto/schemapb" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/internal/util/etcd" @@ -1058,7 +1061,10 @@ func TestSaveBinlogPaths(t *testing.T) { svr := newTestServer(t, nil) defer closeTestServer(t, svr) - svr.meta.AddCollection(&datapb.CollectionInfo{ID: 0}) + // vecFieldID := int64(201) + svr.meta.AddCollection(&datapb.CollectionInfo{ + ID: 0, + }) segments := []struct { id UniqueID @@ -1077,6 +1083,7 @@ func TestSaveBinlogPaths(t *testing.T) { err := svr.meta.AddSegment(NewSegmentInfo(s)) assert.Nil(t, err) } + svr.indexCoord.(*mocks.MockIndexCoord).EXPECT().GetIndexInfos(mock.Anything, mock.Anything).Return(nil, nil) err := svr.channelManager.AddNode(0) assert.Nil(t, err) @@ -1205,7 +1212,18 @@ func TestDropVirtualChannel(t *testing.T) { defer closeTestServer(t, svr) - svr.meta.AddCollection(&datapb.CollectionInfo{ID: 0}) + vecFieldID := int64(201) + svr.meta.AddCollection(&datapb.CollectionInfo{ + ID: 0, + Schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: vecFieldID, + DataType: schemapb.DataType_FloatVector, + }, + }, + }, + }) type testSegment struct { id UniqueID collectionID UniqueID @@ -1236,6 +1254,7 @@ func TestDropVirtualChannel(t *testing.T) { err := svr.meta.AddSegment(NewSegmentInfo(s)) assert.Nil(t, err) } + svr.indexCoord.(*mocks.MockIndexCoord).EXPECT().GetIndexInfos(mock.Anything, mock.Anything).Return(nil, nil) // add non matched segments os := &datapb.SegmentInfo{ ID: maxOperationsPerTxn + 100, @@ -1660,6 +1679,23 @@ func TestGetVChannelPos(t *testing.T) { } err = svr.meta.AddSegment(NewSegmentInfo(s3)) assert.Nil(t, err) + mockResp := &indexpb.GetIndexInfoResponse{ + Status: &commonpb.Status{}, + SegmentInfo: map[int64]*indexpb.SegmentInfo{ + s1.ID: { + CollectionID: s1.CollectionID, + SegmentID: s1.ID, + EnableIndex: true, + IndexInfos: []*indexpb.IndexFilePathInfo{ + { + SegmentID: s1.ID, + FieldID: 2, + }, + }, + }, + }, + } + svr.indexCoord.(*mocks.MockIndexCoord).EXPECT().GetIndexInfos(mock.Anything, mock.Anything).Return(mockResp, nil) t.Run("get unexisted channel", func(t *testing.T) { vchan := svr.handler.GetVChanPositions(&channel{Name: "chx1", CollectionID: 0}, allPartitionID) @@ -1905,12 +1941,43 @@ func TestGetRecoveryInfo(t *testing.T) { return newMockRootCoordService(), nil } + svr.meta.AddCollection(&datapb.CollectionInfo{ + Schema: newTestSchema(), + }) seg1 := createSegment(0, 0, 0, 100, 10, "vchan1", commonpb.SegmentState_Flushed) seg2 := createSegment(1, 0, 0, 100, 20, "vchan1", commonpb.SegmentState_Flushed) err := svr.meta.AddSegment(NewSegmentInfo(seg1)) assert.Nil(t, err) err = svr.meta.AddSegment(NewSegmentInfo(seg2)) assert.Nil(t, err) + mockResp := &indexpb.GetIndexInfoResponse{ + Status: &commonpb.Status{}, + SegmentInfo: map[int64]*indexpb.SegmentInfo{ + seg1.ID: { + CollectionID: seg1.CollectionID, + SegmentID: seg1.ID, + EnableIndex: true, + IndexInfos: []*indexpb.IndexFilePathInfo{ + { + SegmentID: seg1.ID, + FieldID: 2, + }, + }, + }, + seg2.ID: { + CollectionID: seg2.CollectionID, + SegmentID: seg2.ID, + EnableIndex: true, + IndexInfos: []*indexpb.IndexFilePathInfo{ + { + SegmentID: seg2.ID, + FieldID: 2, + }, + }, + }, + }, + } + svr.indexCoord.(*mocks.MockIndexCoord).EXPECT().GetIndexInfos(mock.Anything, mock.Anything).Return(mockResp, nil) req := &datapb.GetRecoveryInfoRequest{ CollectionID: 0, @@ -1939,6 +2006,7 @@ func TestGetRecoveryInfo(t *testing.T) { assert.Nil(t, err) err = svr.meta.AddSegment(NewSegmentInfo(seg2)) assert.Nil(t, err) + svr.indexCoord.(*mocks.MockIndexCoord).EXPECT().GetIndexInfos(mock.Anything, mock.Anything).Return(nil, nil) req := &datapb.GetRecoveryInfoRequest{ CollectionID: 0, @@ -1957,6 +2025,10 @@ func TestGetRecoveryInfo(t *testing.T) { svr := newTestServer(t, nil) defer closeTestServer(t, svr) + svr.meta.AddCollection(&datapb.CollectionInfo{ + Schema: newTestSchema(), + }) + svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoord, error) { return newMockRootCoordService(), nil } @@ -2003,13 +2075,30 @@ func TestGetRecoveryInfo(t *testing.T) { }, }, } - segment := createSegment(0, 0, 0, 100, 10, "ch1", commonpb.SegmentState_Flushed) + segment := createSegment(0, 0, 1, 100, 10, "vchan1", commonpb.SegmentState_Flushed) err := svr.meta.AddSegment(NewSegmentInfo(segment)) assert.Nil(t, err) + mockResp := &indexpb.GetIndexInfoResponse{ + Status: &commonpb.Status{}, + SegmentInfo: map[int64]*indexpb.SegmentInfo{ + segment.ID: { + CollectionID: segment.CollectionID, + SegmentID: segment.ID, + EnableIndex: true, + IndexInfos: []*indexpb.IndexFilePathInfo{ + { + SegmentID: segment.ID, + FieldID: 2, + }, + }, + }, + }, + } + svr.indexCoord.(*mocks.MockIndexCoord).EXPECT().GetIndexInfos(mock.Anything, mock.Anything).Return(mockResp, nil) err = svr.channelManager.AddNode(0) assert.Nil(t, err) - err = svr.channelManager.Watch(&channel{Name: "ch1", CollectionID: 0}) + err = svr.channelManager.Watch(&channel{Name: "vchan1", CollectionID: 0}) assert.Nil(t, err) sResp, err := svr.SaveBinlogPaths(context.TODO(), binlogReq) @@ -2018,7 +2107,7 @@ func TestGetRecoveryInfo(t *testing.T) { req := &datapb.GetRecoveryInfoRequest{ CollectionID: 0, - PartitionID: 0, + PartitionID: 1, } resp, err := svr.GetRecoveryInfo(context.TODO(), req) assert.Nil(t, err) @@ -2045,6 +2134,7 @@ func TestGetRecoveryInfo(t *testing.T) { assert.Nil(t, err) err = svr.meta.AddSegment(NewSegmentInfo(seg2)) assert.Nil(t, err) + svr.indexCoord.(*mocks.MockIndexCoord).EXPECT().GetIndexInfos(mock.Anything, mock.Anything).Return(nil, nil) req := &datapb.GetRecoveryInfoRequest{ CollectionID: 0, @@ -2876,6 +2966,9 @@ func newTestServer(t *testing.T, receiveCh chan interface{}, opts ...Option) *Se svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoord, error) { return newMockRootCoordService(), nil } + indexCoord := mocks.NewMockIndexCoord(t) + indexCoord.EXPECT().GetIndexInfos(context.Background(), mock.Anything).Return(nil, nil).Maybe() + svr.indexCoord = indexCoord err = svr.Init() assert.Nil(t, err) diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 5fe624504346d..b363a2bf63529 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -443,7 +443,7 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath cctx, cancel := context.WithTimeout(s.ctx, 5*time.Second) defer cancel() - ct, err := getCompactTime(cctx, s.allocator) + ct, err := GetCompactTime(cctx, s.allocator) if err == nil { err = s.compactionTrigger.triggerSingleCompaction(segment.GetCollectionID(), segment.GetPartitionID(), segmentID, segment.GetInsertChannel(), ct) @@ -587,9 +587,11 @@ func (s *Server) GetComponentStates(ctx context.Context) (*internalpb.ComponentS func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInfoRequest) (*datapb.GetRecoveryInfoResponse, error) { collectionID := req.GetCollectionID() partitionID := req.GetPartitionID() - log.Info("receive get recovery info request", + log := log.With( zap.Int64("collectionID", collectionID), - zap.Int64("partitionID", partitionID)) + zap.Int64("partitionID", partitionID), + ) + log.Info("receive get recovery info request") resp := &datapb.GetRecoveryInfoResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, @@ -599,37 +601,58 @@ func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInf resp.Status.Reason = serverNotServingErrMsg return resp, nil } - segmentIDs := s.meta.GetSegmentsIDOfPartition(collectionID, partitionID) + + dresp, err := s.rootCoordClient.DescribeCollection(s.ctx, &milvuspb.DescribeCollectionRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_DescribeCollection, + SourceID: Params.DataCoordCfg.GetNodeID(), + }, + CollectionID: collectionID, + }) + if err = VerifyResponse(dresp, err); err != nil { + log.Error("get collection info from rootcoord failed", + zap.Error(err)) + + resp.Status.Reason = err.Error() + return resp, nil + } + channels := dresp.GetVirtualChannelNames() + channelInfos := make([]*datapb.VchannelInfo, 0, len(channels)) + flushedIDs := make(typeutil.UniqueSet) + for _, c := range channels { + channelInfo := s.handler.GetVChanPositions(&channel{Name: c, CollectionID: collectionID}, partitionID) + channelInfos = append(channelInfos, channelInfo) + log.Debug("datacoord append channelInfo in GetRecoveryInfo", + zap.Any("channelInfo", channelInfo), + ) + + flushedIDs.Insert(channelInfo.GetFlushedSegmentIds()...) + } + segment2Binlogs := make(map[UniqueID][]*datapb.FieldBinlog) segment2StatsBinlogs := make(map[UniqueID][]*datapb.FieldBinlog) segment2DeltaBinlogs := make(map[UniqueID][]*datapb.FieldBinlog) segment2InsertChannel := make(map[UniqueID]string) segmentsNumOfRows := make(map[UniqueID]int64) - - flushedIDs := make(map[int64]struct{}) - for _, id := range segmentIDs { - segment := s.meta.GetSegment(id) + for id := range flushedIDs { + segment := s.meta.GetSegmentUnsafe(id) if segment == nil { errMsg := fmt.Sprintf("failed to get segment %d", id) log.Error(errMsg) resp.Status.Reason = errMsg return resp, nil } - if segment.State != commonpb.SegmentState_Flushed && segment.State != commonpb.SegmentState_Flushing { + if segment.State != commonpb.SegmentState_Flushed && segment.State != commonpb.SegmentState_Flushing && segment.State != commonpb.SegmentState_Dropped { continue } segment2InsertChannel[segment.ID] = segment.InsertChannel binlogs := segment.GetBinlogs() if len(binlogs) == 0 { + flushedIDs.Remove(id) continue } - _, ok := flushedIDs[id] - if !ok { - flushedIDs[id] = struct{}{} - } - field2Binlog := make(map[UniqueID][]*datapb.Binlog) for _, field := range binlogs { field2Binlog[field.GetFieldID()] = append(field2Binlog[field.GetFieldID()], field.GetBinlogs()...) @@ -677,35 +700,8 @@ func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInf binlogs = append(binlogs, sbl) } - dresp, err := s.rootCoordClient.DescribeCollection(s.ctx, &milvuspb.DescribeCollectionRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_DescribeCollection, - SourceID: Params.DataCoordCfg.GetNodeID(), - }, - CollectionID: collectionID, - }) - if err = VerifyResponse(dresp, err); err != nil { - log.Error("get collection info from rootcoord failed", - zap.Int64("collectionID", collectionID), - zap.Error(err)) - - resp.Status.Reason = err.Error() - return resp, nil - } - - channels := dresp.GetVirtualChannelNames() - channelInfos := make([]*datapb.VchannelInfo, 0, len(channels)) - for _, c := range channels { - channelInfo := s.handler.GetVChanPositions(&channel{Name: c, CollectionID: collectionID}, partitionID) - channelInfos = append(channelInfos, channelInfo) - log.Debug("datacoord append channelInfo in GetRecoveryInfo", - zap.Any("collectionID", collectionID), - zap.Any("channelInfo", channelInfo), - ) - } - - resp.Binlogs = binlogs resp.Channels = channelInfos + resp.Binlogs = binlogs resp.Status.ErrorCode = commonpb.ErrorCode_Success return resp, nil } @@ -722,21 +718,24 @@ func (s *Server) GetFlushedSegments(ctx context.Context, req *datapb.GetFlushedS partitionID := req.GetPartitionID() log.Debug("received get flushed segments request", zap.Int64("collectionID", collectionID), - zap.Int64("partitionID", partitionID)) + zap.Int64("partitionID", partitionID), + ) if s.isClosed() { resp.Status.Reason = serverNotServingErrMsg return resp, nil } var segmentIDs []UniqueID if partitionID < 0 { - segmentIDs = s.meta.GetSegmentsIDOfCollection(collectionID) + segmentIDs = s.meta.GetSegmentsIDOfCollectionWithDropped(collectionID) } else { - segmentIDs = s.meta.GetSegmentsIDOfPartition(collectionID, partitionID) + segmentIDs = s.meta.GetSegmentsIDOfPartitionWithDropped(collectionID, partitionID) } ret := make([]UniqueID, 0, len(segmentIDs)) for _, id := range segmentIDs { segment := s.meta.GetSegment(id) - if segment != nil && segment.GetState() != commonpb.SegmentState_Flushed { + if segment != nil && + segment.GetState() != commonpb.SegmentState_Dropped && + segment.GetState() != commonpb.SegmentState_Flushed { continue } @@ -872,7 +871,7 @@ func (s *Server) ManualCompaction(ctx context.Context, req *milvuspb.ManualCompa return resp, nil } - ct, err := getCompactTime(ctx, s.allocator) + ct, err := GetCompactTime(ctx, s.allocator) if err != nil { log.Warn("failed to get compact time", zap.Int64("collectionID", req.GetCollectionID()), zap.Error(err)) resp.Status.Reason = err.Error() diff --git a/internal/datacoord/util.go b/internal/datacoord/util.go index 33f416013dfef..63d75f92149e0 100644 --- a/internal/datacoord/util.go +++ b/internal/datacoord/util.go @@ -19,10 +19,17 @@ package datacoord import ( "context" "errors" + "sync" "time" + "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/commonpb" + "github.com/milvus-io/milvus/internal/proto/indexpb" + "github.com/milvus-io/milvus/internal/proto/schemapb" + "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/tsoutil" + "github.com/milvus-io/milvus/internal/util/typeutil" + "go.uber.org/zap" ) // Response response interface for verification @@ -72,7 +79,7 @@ func failResponseWithCode(status *commonpb.Status, errCode commonpb.ErrorCode, r status.Reason = reason } -func getCompactTime(ctx context.Context, allocator allocator) (*compactTime, error) { +func GetCompactTime(ctx context.Context, allocator allocator) (*compactTime, error) { ts, err := allocator.allocTimestamp(ctx) if err != nil { return nil, err @@ -91,3 +98,99 @@ func getCompactTime(ctx context.Context, allocator allocator) (*compactTime, err // no expiration time return &compactTime{ttRetentionLogic, 0}, nil } + +func FilterInIndexedSegments(meta *meta, indexCoord types.IndexCoord, segments ...*SegmentInfo) []*SegmentInfo { + if len(segments) == 0 { + return nil + } + + segmentMap := make(map[int64]*SegmentInfo) + collectionSegments := make(map[int64][]int64) + // TODO(yah01): This can't handle the case of multiple vector fields exist, + // modify it if we support multiple vector fields. + vecFieldID := make(map[int64]int64) + for _, segment := range segments { + collectionID := segment.GetCollectionID() + segmentMap[segment.GetID()] = segment + collectionSegments[collectionID] = append(collectionSegments[collectionID], segment.GetID()) + } + for collection := range collectionSegments { + schema := meta.GetCollection(collection).GetSchema() + for _, field := range schema.GetFields() { + if field.GetDataType() == schemapb.DataType_BinaryVector || + field.GetDataType() == schemapb.DataType_FloatVector { + vecFieldID[collection] = field.GetFieldID() + break + } + } + } + + wg := sync.WaitGroup{} + indexedSegmentCh := make(chan []int64, len(segments)) + for _, segment := range segments { + segment := segment + wg.Add(1) + go func() { + defer wg.Done() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + resp, err := indexCoord.GetIndexInfos(ctx, &indexpb.GetIndexInfoRequest{ + CollectionID: segment.GetCollectionID(), + SegmentIDs: []int64{segment.GetID()}, + }) + if err != nil || resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { + log.Warn("failed to get index of collection", + zap.Int64("collectionID", segment.GetCollectionID()), + zap.Int64("segmentID", segment.GetID())) + return + } + indexed := extractSegmentsWithVectorIndex(vecFieldID, resp.GetSegmentInfo()) + if len(indexed) == 0 { + log.Debug("no vector index for the segment", + zap.Int64("collectionID", segment.GetCollectionID()), + zap.Int64("segmentID", segment.GetID())) + return + } + indexedSegmentCh <- indexed + }() + } + wg.Wait() + close(indexedSegmentCh) + + indexedSegments := make([]*SegmentInfo, 0) + for segments := range indexedSegmentCh { + for _, segment := range segments { + if info, ok := segmentMap[segment]; ok { + delete(segmentMap, segment) + indexedSegments = append(indexedSegments, info) + } + } + } + + return indexedSegments +} + +func extractSegmentsWithVectorIndex(vecFieldID map[int64]int64, segentIndexInfo map[int64]*indexpb.SegmentInfo) []int64 { + indexedSegments := make(typeutil.UniqueSet) + for _, indexInfo := range segentIndexInfo { + if !indexInfo.GetEnableIndex() { + continue + } + for _, index := range indexInfo.GetIndexInfos() { + if index.GetFieldID() == vecFieldID[indexInfo.GetCollectionID()] { + indexedSegments.Insert(indexInfo.GetSegmentID()) + break + } + } + } + return indexedSegments.Collect() +} + +func IsParentDropped(meta *meta, segment *SegmentInfo) bool { + for _, from := range segment.CompactionFrom { + if meta.GetSegment(from) != nil { + return false + } + } + return true +} diff --git a/internal/datacoord/util_test.go b/internal/datacoord/util_test.go index 09f7b839c5123..7d609bbc46637 100644 --- a/internal/datacoord/util_test.go +++ b/internal/datacoord/util_test.go @@ -23,12 +23,17 @@ import ( "time" "github.com/milvus-io/milvus/internal/proto/commonpb" + "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/rootcoordpb" "github.com/milvus-io/milvus/internal/util/tsoutil" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" ) -func TestVerifyResponse(t *testing.T) { +type UtilSuite struct { + suite.Suite +} + +func (suite *UtilSuite) TestVerifyResponse() { type testCase struct { resp interface{} err error @@ -104,14 +109,14 @@ func TestVerifyResponse(t *testing.T) { for _, c := range cases { r := VerifyResponse(c.resp, c.err) if c.equalValue { - assert.EqualValues(t, c.expected, r) + suite.EqualValues(c.expected, r) } else { - assert.Equal(t, c.expected, r) + suite.Equal(c.expected, r) } } } -func Test_getCompactTime(t *testing.T) { +func (suite *UtilSuite) TestGetCompactTime() { Params.Init() Params.CommonCfg.RetentionDuration = 43200 // 5 days @@ -135,14 +140,59 @@ func Test_getCompactTime(t *testing.T) { }, } for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, err := getCompactTime(context.TODO(), tt.args.allocator) - assert.Equal(t, tt.wantErr, err != nil) - assert.EqualValues(t, tt.want, got) + suite.Run(tt.name, func() { + got, err := GetCompactTime(context.TODO(), tt.args.allocator) + suite.Equal(tt.wantErr, err != nil) + suite.EqualValues(tt.want, got) }) } } +func (suite *UtilSuite) TestIsParentDropped() { + meta := &meta{ + segments: &SegmentsInfo{ + segments: map[int64]*SegmentInfo{ + 1: { + SegmentInfo: &datapb.SegmentInfo{ + ID: 1, + State: commonpb.SegmentState_Flushed, + }, + }, + 3: { + SegmentInfo: &datapb.SegmentInfo{ + ID: 3, + CompactionFrom: []int64{1}, + State: commonpb.SegmentState_Flushed, + }, + }, + 5: { + SegmentInfo: &datapb.SegmentInfo{ + ID: 5, + CompactionFrom: []int64{1, 2}, + State: commonpb.SegmentState_Flushed, + }, + }, + 7: { + SegmentInfo: &datapb.SegmentInfo{ + ID: 7, + CompactionFrom: []int64{2, 4}, + State: commonpb.SegmentState_Flushed, + }, + }, + }, + }, + } + + suite.True(IsParentDropped(meta, meta.GetSegment(1))) + suite.False(IsParentDropped(meta, meta.GetSegment(3))) + suite.False(IsParentDropped(meta, meta.GetSegment(5))) + suite.True(IsParentDropped(meta, meta.GetSegment(7))) +} + +func TestUtil(t *testing.T) { + suite.Run(t, new(UtilSuite)) +} + type fixedTSOAllocator struct { fixedTime time.Time } diff --git a/internal/distributed/datacoord/service.go b/internal/distributed/datacoord/service.go index 274e083633c6b..48a8a9517e98d 100644 --- a/internal/distributed/datacoord/service.go +++ b/internal/distributed/datacoord/service.go @@ -27,6 +27,7 @@ import ( grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" ot "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" + icc "github.com/milvus-io/milvus/internal/distributed/indexcoord/client" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" "google.golang.org/grpc" @@ -59,7 +60,8 @@ type Server struct { wg sync.WaitGroup dataCoord types.DataCoordComponent - etcdCli *clientv3.Client + etcdCli *clientv3.Client + indexCoord types.IndexCoord grpcErrChan chan error grpcServer *grpc.Server @@ -98,6 +100,25 @@ func (s *Server) init() error { s.etcdCli = etcdCli s.dataCoord.SetEtcdClient(etcdCli) + if s.indexCoord == nil { + var err error + log.Debug("create IndexCoord client for DataCoord") + s.indexCoord, err = icc.NewClient(s.ctx, Params.EtcdCfg.MetaRootPath, etcdCli) + if err != nil { + log.Warn("failed to create IndexCoord client for DataCoord", zap.Error(err)) + return err + } + log.Debug("create IndexCoord client for DataCoord done") + } + + log.Debug("init IndexCoord client for DataCoord") + if err := s.indexCoord.Init(); err != nil { + log.Warn("failed to init IndexCoord client for DataCoord", zap.Error(err)) + return err + } + log.Debug("init IndexCoord client for DataCoord done") + s.dataCoord.SetIndexCoord(s.indexCoord) + err = s.startGrpc() if err != nil { log.Debug("DataCoord startGrpc failed", zap.Error(err)) diff --git a/internal/distributed/datacoord/service_test.go b/internal/distributed/datacoord/service_test.go index 44a22f9ebb57c..2253ada829489 100644 --- a/internal/distributed/datacoord/service_test.go +++ b/internal/distributed/datacoord/service_test.go @@ -21,10 +21,12 @@ import ( "errors" "testing" + "github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/milvuspb" + "github.com/milvus-io/milvus/internal/types" "github.com/stretchr/testify/assert" clientv3 "go.etcd.io/etcd/client/v3" ) @@ -83,6 +85,9 @@ func (m *MockDataCoord) Register() error { func (m *MockDataCoord) SetEtcdClient(etcdClient *clientv3.Client) { } +func (m *MockDataCoord) SetIndexCoord(indexCoord types.IndexCoord) { +} + func (m *MockDataCoord) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) { return m.states, m.err } @@ -207,6 +212,10 @@ func Test_NewServer(t *testing.T) { t.Run("Run", func(t *testing.T) { server.dataCoord = &MockDataCoord{} + indexCoord := mocks.NewMockIndexCoord(t) + indexCoord.EXPECT().Init().Return(nil) + server.indexCoord = indexCoord + err := server.Run() assert.Nil(t, err) }) diff --git a/internal/mocks/mock_indexcoord.go b/internal/mocks/mock_indexcoord.go new file mode 100644 index 0000000000000..f72578cd254a8 --- /dev/null +++ b/internal/mocks/mock_indexcoord.go @@ -0,0 +1,704 @@ +// Code generated by mockery v2.14.0. DO NOT EDIT. + +package mocks + +import ( + context "context" + + commonpb "github.com/milvus-io/milvus/internal/proto/commonpb" + + indexpb "github.com/milvus-io/milvus/internal/proto/indexpb" + + internalpb "github.com/milvus-io/milvus/internal/proto/internalpb" + + milvuspb "github.com/milvus-io/milvus/internal/proto/milvuspb" + + mock "github.com/stretchr/testify/mock" +) + +// MockIndexCoord is an autogenerated mock type for the IndexCoord type +type MockIndexCoord struct { + mock.Mock +} + +type MockIndexCoord_Expecter struct { + mock *mock.Mock +} + +func (_m *MockIndexCoord) EXPECT() *MockIndexCoord_Expecter { + return &MockIndexCoord_Expecter{mock: &_m.Mock} +} + +// CreateIndex provides a mock function with given fields: ctx, req +func (_m *MockIndexCoord) CreateIndex(ctx context.Context, req *indexpb.CreateIndexRequest) (*commonpb.Status, error) { + ret := _m.Called(ctx, req) + + var r0 *commonpb.Status + if rf, ok := ret.Get(0).(func(context.Context, *indexpb.CreateIndexRequest) *commonpb.Status); ok { + r0 = rf(ctx, req) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*commonpb.Status) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *indexpb.CreateIndexRequest) error); ok { + r1 = rf(ctx, req) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockIndexCoord_CreateIndex_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CreateIndex' +type MockIndexCoord_CreateIndex_Call struct { + *mock.Call +} + +// CreateIndex is a helper method to define mock.On call +// - ctx context.Context +// - req *indexpb.CreateIndexRequest +func (_e *MockIndexCoord_Expecter) CreateIndex(ctx interface{}, req interface{}) *MockIndexCoord_CreateIndex_Call { + return &MockIndexCoord_CreateIndex_Call{Call: _e.mock.On("CreateIndex", ctx, req)} +} + +func (_c *MockIndexCoord_CreateIndex_Call) Run(run func(ctx context.Context, req *indexpb.CreateIndexRequest)) *MockIndexCoord_CreateIndex_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*indexpb.CreateIndexRequest)) + }) + return _c +} + +func (_c *MockIndexCoord_CreateIndex_Call) Return(_a0 *commonpb.Status, _a1 error) *MockIndexCoord_CreateIndex_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +// DescribeIndex provides a mock function with given fields: ctx, req +func (_m *MockIndexCoord) DescribeIndex(ctx context.Context, req *indexpb.DescribeIndexRequest) (*indexpb.DescribeIndexResponse, error) { + ret := _m.Called(ctx, req) + + var r0 *indexpb.DescribeIndexResponse + if rf, ok := ret.Get(0).(func(context.Context, *indexpb.DescribeIndexRequest) *indexpb.DescribeIndexResponse); ok { + r0 = rf(ctx, req) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*indexpb.DescribeIndexResponse) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *indexpb.DescribeIndexRequest) error); ok { + r1 = rf(ctx, req) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockIndexCoord_DescribeIndex_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DescribeIndex' +type MockIndexCoord_DescribeIndex_Call struct { + *mock.Call +} + +// DescribeIndex is a helper method to define mock.On call +// - ctx context.Context +// - req *indexpb.DescribeIndexRequest +func (_e *MockIndexCoord_Expecter) DescribeIndex(ctx interface{}, req interface{}) *MockIndexCoord_DescribeIndex_Call { + return &MockIndexCoord_DescribeIndex_Call{Call: _e.mock.On("DescribeIndex", ctx, req)} +} + +func (_c *MockIndexCoord_DescribeIndex_Call) Run(run func(ctx context.Context, req *indexpb.DescribeIndexRequest)) *MockIndexCoord_DescribeIndex_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*indexpb.DescribeIndexRequest)) + }) + return _c +} + +func (_c *MockIndexCoord_DescribeIndex_Call) Return(_a0 *indexpb.DescribeIndexResponse, _a1 error) *MockIndexCoord_DescribeIndex_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +// DropIndex provides a mock function with given fields: ctx, req +func (_m *MockIndexCoord) DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) (*commonpb.Status, error) { + ret := _m.Called(ctx, req) + + var r0 *commonpb.Status + if rf, ok := ret.Get(0).(func(context.Context, *indexpb.DropIndexRequest) *commonpb.Status); ok { + r0 = rf(ctx, req) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*commonpb.Status) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *indexpb.DropIndexRequest) error); ok { + r1 = rf(ctx, req) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockIndexCoord_DropIndex_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DropIndex' +type MockIndexCoord_DropIndex_Call struct { + *mock.Call +} + +// DropIndex is a helper method to define mock.On call +// - ctx context.Context +// - req *indexpb.DropIndexRequest +func (_e *MockIndexCoord_Expecter) DropIndex(ctx interface{}, req interface{}) *MockIndexCoord_DropIndex_Call { + return &MockIndexCoord_DropIndex_Call{Call: _e.mock.On("DropIndex", ctx, req)} +} + +func (_c *MockIndexCoord_DropIndex_Call) Run(run func(ctx context.Context, req *indexpb.DropIndexRequest)) *MockIndexCoord_DropIndex_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*indexpb.DropIndexRequest)) + }) + return _c +} + +func (_c *MockIndexCoord_DropIndex_Call) Return(_a0 *commonpb.Status, _a1 error) *MockIndexCoord_DropIndex_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +// GetComponentStates provides a mock function with given fields: ctx +func (_m *MockIndexCoord) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) { + ret := _m.Called(ctx) + + var r0 *internalpb.ComponentStates + if rf, ok := ret.Get(0).(func(context.Context) *internalpb.ComponentStates); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*internalpb.ComponentStates) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockIndexCoord_GetComponentStates_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetComponentStates' +type MockIndexCoord_GetComponentStates_Call struct { + *mock.Call +} + +// GetComponentStates is a helper method to define mock.On call +// - ctx context.Context +func (_e *MockIndexCoord_Expecter) GetComponentStates(ctx interface{}) *MockIndexCoord_GetComponentStates_Call { + return &MockIndexCoord_GetComponentStates_Call{Call: _e.mock.On("GetComponentStates", ctx)} +} + +func (_c *MockIndexCoord_GetComponentStates_Call) Run(run func(ctx context.Context)) *MockIndexCoord_GetComponentStates_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *MockIndexCoord_GetComponentStates_Call) Return(_a0 *internalpb.ComponentStates, _a1 error) *MockIndexCoord_GetComponentStates_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +// GetIndexBuildProgress provides a mock function with given fields: ctx, req +func (_m *MockIndexCoord) GetIndexBuildProgress(ctx context.Context, req *indexpb.GetIndexBuildProgressRequest) (*indexpb.GetIndexBuildProgressResponse, error) { + ret := _m.Called(ctx, req) + + var r0 *indexpb.GetIndexBuildProgressResponse + if rf, ok := ret.Get(0).(func(context.Context, *indexpb.GetIndexBuildProgressRequest) *indexpb.GetIndexBuildProgressResponse); ok { + r0 = rf(ctx, req) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*indexpb.GetIndexBuildProgressResponse) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *indexpb.GetIndexBuildProgressRequest) error); ok { + r1 = rf(ctx, req) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockIndexCoord_GetIndexBuildProgress_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetIndexBuildProgress' +type MockIndexCoord_GetIndexBuildProgress_Call struct { + *mock.Call +} + +// GetIndexBuildProgress is a helper method to define mock.On call +// - ctx context.Context +// - req *indexpb.GetIndexBuildProgressRequest +func (_e *MockIndexCoord_Expecter) GetIndexBuildProgress(ctx interface{}, req interface{}) *MockIndexCoord_GetIndexBuildProgress_Call { + return &MockIndexCoord_GetIndexBuildProgress_Call{Call: _e.mock.On("GetIndexBuildProgress", ctx, req)} +} + +func (_c *MockIndexCoord_GetIndexBuildProgress_Call) Run(run func(ctx context.Context, req *indexpb.GetIndexBuildProgressRequest)) *MockIndexCoord_GetIndexBuildProgress_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*indexpb.GetIndexBuildProgressRequest)) + }) + return _c +} + +func (_c *MockIndexCoord_GetIndexBuildProgress_Call) Return(_a0 *indexpb.GetIndexBuildProgressResponse, _a1 error) *MockIndexCoord_GetIndexBuildProgress_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +// GetIndexInfos provides a mock function with given fields: ctx, req +func (_m *MockIndexCoord) GetIndexInfos(ctx context.Context, req *indexpb.GetIndexInfoRequest) (*indexpb.GetIndexInfoResponse, error) { + ret := _m.Called(ctx, req) + + var r0 *indexpb.GetIndexInfoResponse + if rf, ok := ret.Get(0).(func(context.Context, *indexpb.GetIndexInfoRequest) *indexpb.GetIndexInfoResponse); ok { + r0 = rf(ctx, req) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*indexpb.GetIndexInfoResponse) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *indexpb.GetIndexInfoRequest) error); ok { + r1 = rf(ctx, req) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockIndexCoord_GetIndexInfos_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetIndexInfos' +type MockIndexCoord_GetIndexInfos_Call struct { + *mock.Call +} + +// GetIndexInfos is a helper method to define mock.On call +// - ctx context.Context +// - req *indexpb.GetIndexInfoRequest +func (_e *MockIndexCoord_Expecter) GetIndexInfos(ctx interface{}, req interface{}) *MockIndexCoord_GetIndexInfos_Call { + return &MockIndexCoord_GetIndexInfos_Call{Call: _e.mock.On("GetIndexInfos", ctx, req)} +} + +func (_c *MockIndexCoord_GetIndexInfos_Call) Run(run func(ctx context.Context, req *indexpb.GetIndexInfoRequest)) *MockIndexCoord_GetIndexInfos_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*indexpb.GetIndexInfoRequest)) + }) + return _c +} + +func (_c *MockIndexCoord_GetIndexInfos_Call) Return(_a0 *indexpb.GetIndexInfoResponse, _a1 error) *MockIndexCoord_GetIndexInfos_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +// GetIndexState provides a mock function with given fields: ctx, req +func (_m *MockIndexCoord) GetIndexState(ctx context.Context, req *indexpb.GetIndexStateRequest) (*indexpb.GetIndexStateResponse, error) { + ret := _m.Called(ctx, req) + + var r0 *indexpb.GetIndexStateResponse + if rf, ok := ret.Get(0).(func(context.Context, *indexpb.GetIndexStateRequest) *indexpb.GetIndexStateResponse); ok { + r0 = rf(ctx, req) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*indexpb.GetIndexStateResponse) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *indexpb.GetIndexStateRequest) error); ok { + r1 = rf(ctx, req) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockIndexCoord_GetIndexState_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetIndexState' +type MockIndexCoord_GetIndexState_Call struct { + *mock.Call +} + +// GetIndexState is a helper method to define mock.On call +// - ctx context.Context +// - req *indexpb.GetIndexStateRequest +func (_e *MockIndexCoord_Expecter) GetIndexState(ctx interface{}, req interface{}) *MockIndexCoord_GetIndexState_Call { + return &MockIndexCoord_GetIndexState_Call{Call: _e.mock.On("GetIndexState", ctx, req)} +} + +func (_c *MockIndexCoord_GetIndexState_Call) Run(run func(ctx context.Context, req *indexpb.GetIndexStateRequest)) *MockIndexCoord_GetIndexState_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*indexpb.GetIndexStateRequest)) + }) + return _c +} + +func (_c *MockIndexCoord_GetIndexState_Call) Return(_a0 *indexpb.GetIndexStateResponse, _a1 error) *MockIndexCoord_GetIndexState_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +// GetMetrics provides a mock function with given fields: ctx, req +func (_m *MockIndexCoord) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { + ret := _m.Called(ctx, req) + + var r0 *milvuspb.GetMetricsResponse + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.GetMetricsRequest) *milvuspb.GetMetricsResponse); ok { + r0 = rf(ctx, req) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*milvuspb.GetMetricsResponse) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.GetMetricsRequest) error); ok { + r1 = rf(ctx, req) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockIndexCoord_GetMetrics_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetMetrics' +type MockIndexCoord_GetMetrics_Call struct { + *mock.Call +} + +// GetMetrics is a helper method to define mock.On call +// - ctx context.Context +// - req *milvuspb.GetMetricsRequest +func (_e *MockIndexCoord_Expecter) GetMetrics(ctx interface{}, req interface{}) *MockIndexCoord_GetMetrics_Call { + return &MockIndexCoord_GetMetrics_Call{Call: _e.mock.On("GetMetrics", ctx, req)} +} + +func (_c *MockIndexCoord_GetMetrics_Call) Run(run func(ctx context.Context, req *milvuspb.GetMetricsRequest)) *MockIndexCoord_GetMetrics_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*milvuspb.GetMetricsRequest)) + }) + return _c +} + +func (_c *MockIndexCoord_GetMetrics_Call) Return(_a0 *milvuspb.GetMetricsResponse, _a1 error) *MockIndexCoord_GetMetrics_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +// GetSegmentIndexState provides a mock function with given fields: ctx, req +func (_m *MockIndexCoord) GetSegmentIndexState(ctx context.Context, req *indexpb.GetSegmentIndexStateRequest) (*indexpb.GetSegmentIndexStateResponse, error) { + ret := _m.Called(ctx, req) + + var r0 *indexpb.GetSegmentIndexStateResponse + if rf, ok := ret.Get(0).(func(context.Context, *indexpb.GetSegmentIndexStateRequest) *indexpb.GetSegmentIndexStateResponse); ok { + r0 = rf(ctx, req) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*indexpb.GetSegmentIndexStateResponse) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *indexpb.GetSegmentIndexStateRequest) error); ok { + r1 = rf(ctx, req) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockIndexCoord_GetSegmentIndexState_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSegmentIndexState' +type MockIndexCoord_GetSegmentIndexState_Call struct { + *mock.Call +} + +// GetSegmentIndexState is a helper method to define mock.On call +// - ctx context.Context +// - req *indexpb.GetSegmentIndexStateRequest +func (_e *MockIndexCoord_Expecter) GetSegmentIndexState(ctx interface{}, req interface{}) *MockIndexCoord_GetSegmentIndexState_Call { + return &MockIndexCoord_GetSegmentIndexState_Call{Call: _e.mock.On("GetSegmentIndexState", ctx, req)} +} + +func (_c *MockIndexCoord_GetSegmentIndexState_Call) Run(run func(ctx context.Context, req *indexpb.GetSegmentIndexStateRequest)) *MockIndexCoord_GetSegmentIndexState_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*indexpb.GetSegmentIndexStateRequest)) + }) + return _c +} + +func (_c *MockIndexCoord_GetSegmentIndexState_Call) Return(_a0 *indexpb.GetSegmentIndexStateResponse, _a1 error) *MockIndexCoord_GetSegmentIndexState_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +// GetStatisticsChannel provides a mock function with given fields: ctx +func (_m *MockIndexCoord) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) { + ret := _m.Called(ctx) + + var r0 *milvuspb.StringResponse + if rf, ok := ret.Get(0).(func(context.Context) *milvuspb.StringResponse); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*milvuspb.StringResponse) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockIndexCoord_GetStatisticsChannel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetStatisticsChannel' +type MockIndexCoord_GetStatisticsChannel_Call struct { + *mock.Call +} + +// GetStatisticsChannel is a helper method to define mock.On call +// - ctx context.Context +func (_e *MockIndexCoord_Expecter) GetStatisticsChannel(ctx interface{}) *MockIndexCoord_GetStatisticsChannel_Call { + return &MockIndexCoord_GetStatisticsChannel_Call{Call: _e.mock.On("GetStatisticsChannel", ctx)} +} + +func (_c *MockIndexCoord_GetStatisticsChannel_Call) Run(run func(ctx context.Context)) *MockIndexCoord_GetStatisticsChannel_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *MockIndexCoord_GetStatisticsChannel_Call) Return(_a0 *milvuspb.StringResponse, _a1 error) *MockIndexCoord_GetStatisticsChannel_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +// Init provides a mock function with given fields: +func (_m *MockIndexCoord) Init() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockIndexCoord_Init_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Init' +type MockIndexCoord_Init_Call struct { + *mock.Call +} + +// Init is a helper method to define mock.On call +func (_e *MockIndexCoord_Expecter) Init() *MockIndexCoord_Init_Call { + return &MockIndexCoord_Init_Call{Call: _e.mock.On("Init")} +} + +func (_c *MockIndexCoord_Init_Call) Run(run func()) *MockIndexCoord_Init_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockIndexCoord_Init_Call) Return(_a0 error) *MockIndexCoord_Init_Call { + _c.Call.Return(_a0) + return _c +} + +// Register provides a mock function with given fields: +func (_m *MockIndexCoord) Register() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockIndexCoord_Register_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Register' +type MockIndexCoord_Register_Call struct { + *mock.Call +} + +// Register is a helper method to define mock.On call +func (_e *MockIndexCoord_Expecter) Register() *MockIndexCoord_Register_Call { + return &MockIndexCoord_Register_Call{Call: _e.mock.On("Register")} +} + +func (_c *MockIndexCoord_Register_Call) Run(run func()) *MockIndexCoord_Register_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockIndexCoord_Register_Call) Return(_a0 error) *MockIndexCoord_Register_Call { + _c.Call.Return(_a0) + return _c +} + +// ShowConfigurations provides a mock function with given fields: ctx, req +func (_m *MockIndexCoord) ShowConfigurations(ctx context.Context, req *internalpb.ShowConfigurationsRequest) (*internalpb.ShowConfigurationsResponse, error) { + ret := _m.Called(ctx, req) + + var r0 *internalpb.ShowConfigurationsResponse + if rf, ok := ret.Get(0).(func(context.Context, *internalpb.ShowConfigurationsRequest) *internalpb.ShowConfigurationsResponse); ok { + r0 = rf(ctx, req) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*internalpb.ShowConfigurationsResponse) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *internalpb.ShowConfigurationsRequest) error); ok { + r1 = rf(ctx, req) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockIndexCoord_ShowConfigurations_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ShowConfigurations' +type MockIndexCoord_ShowConfigurations_Call struct { + *mock.Call +} + +// ShowConfigurations is a helper method to define mock.On call +// - ctx context.Context +// - req *internalpb.ShowConfigurationsRequest +func (_e *MockIndexCoord_Expecter) ShowConfigurations(ctx interface{}, req interface{}) *MockIndexCoord_ShowConfigurations_Call { + return &MockIndexCoord_ShowConfigurations_Call{Call: _e.mock.On("ShowConfigurations", ctx, req)} +} + +func (_c *MockIndexCoord_ShowConfigurations_Call) Run(run func(ctx context.Context, req *internalpb.ShowConfigurationsRequest)) *MockIndexCoord_ShowConfigurations_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*internalpb.ShowConfigurationsRequest)) + }) + return _c +} + +func (_c *MockIndexCoord_ShowConfigurations_Call) Return(_a0 *internalpb.ShowConfigurationsResponse, _a1 error) *MockIndexCoord_ShowConfigurations_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +// Start provides a mock function with given fields: +func (_m *MockIndexCoord) Start() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockIndexCoord_Start_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Start' +type MockIndexCoord_Start_Call struct { + *mock.Call +} + +// Start is a helper method to define mock.On call +func (_e *MockIndexCoord_Expecter) Start() *MockIndexCoord_Start_Call { + return &MockIndexCoord_Start_Call{Call: _e.mock.On("Start")} +} + +func (_c *MockIndexCoord_Start_Call) Run(run func()) *MockIndexCoord_Start_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockIndexCoord_Start_Call) Return(_a0 error) *MockIndexCoord_Start_Call { + _c.Call.Return(_a0) + return _c +} + +// Stop provides a mock function with given fields: +func (_m *MockIndexCoord) Stop() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockIndexCoord_Stop_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Stop' +type MockIndexCoord_Stop_Call struct { + *mock.Call +} + +// Stop is a helper method to define mock.On call +func (_e *MockIndexCoord_Expecter) Stop() *MockIndexCoord_Stop_Call { + return &MockIndexCoord_Stop_Call{Call: _e.mock.On("Stop")} +} + +func (_c *MockIndexCoord_Stop_Call) Run(run func()) *MockIndexCoord_Stop_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockIndexCoord_Stop_Call) Return(_a0 error) *MockIndexCoord_Stop_Call { + _c.Call.Return(_a0) + return _c +} + +type mockConstructorTestingTNewMockIndexCoord interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockIndexCoord creates a new instance of MockIndexCoord. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockIndexCoord(t mockConstructorTestingTNewMockIndexCoord) *MockIndexCoord { + mock := &MockIndexCoord{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/querycoord/global_meta_broker.go b/internal/querycoord/global_meta_broker.go index 9d2142f1ccc07..4286a2e98787c 100644 --- a/internal/querycoord/global_meta_broker.go +++ b/internal/querycoord/global_meta_broker.go @@ -245,11 +245,6 @@ func (broker *globalMetaBroker) getFullIndexInfos(ctx context.Context, collectio IndexSize: int64(info.SerializedSize), } - if len(info.IndexFilePaths) <= 0 { - log.Warn("index not ready", zap.Int64("index_build_id", info.BuildID)) - return nil, fmt.Errorf("index not ready, index build id: %d", info.BuildID) - } - ret[segmentID] = append(ret[segmentID], indexInfo) } } diff --git a/internal/querycoord/task_test.go b/internal/querycoord/task_test.go index 124b51315b452..dee3b603c71f7 100644 --- a/internal/querycoord/task_test.go +++ b/internal/querycoord/task_test.go @@ -1268,6 +1268,7 @@ func TestLoadBalanceAndRescheduleDmChannelTaskAfterNodeDown(t *testing.T) { if len(activeTaskValues) != 0 { break } + time.Sleep(200 * time.Millisecond) } node3, err := startQueryNodeServer(ctx) @@ -1288,6 +1289,7 @@ func TestLoadBalanceAndRescheduleDmChannelTaskAfterNodeDown(t *testing.T) { if len(triggrtTaskValues) == 0 { break } + time.Sleep(200 * time.Millisecond) } } diff --git a/internal/types/types.go b/internal/types/types.go index 7d51c80f5bf38..00f070b1b3636 100644 --- a/internal/types/types.go +++ b/internal/types/types.go @@ -316,6 +316,8 @@ type DataCoordComponent interface { // SetEtcdClient set EtcdClient for DataCoord // `etcdClient` is a client of etcd SetEtcdClient(etcdClient *clientv3.Client) + + SetIndexCoord(indexCoord IndexCoord) } // IndexNode is the interface `indexnode` package implements diff --git a/tests/python_client/testcases/test_collection.py b/tests/python_client/testcases/test_collection.py index e7a2d628d6a18..97dc149464204 100644 --- a/tests/python_client/testcases/test_collection.py +++ b/tests/python_client/testcases/test_collection.py @@ -1517,7 +1517,7 @@ def test_collection_count_after_index_created_binary(self, insert_count): assert collection_w.num_entities == insert_count @pytest.mark.tags(CaseLabel.L1) - @pytest.mark.parametrize("auto_id",[True, False]) + @pytest.mark.parametrize("auto_id", [True, False]) def test_binary_collection_with_min_dim(self, auto_id): """ target: test binary collection when dim=1 @@ -2617,7 +2617,7 @@ def test_load_partition_after_index_binary(self, binary_index, metric_type): binary_index["metric_type"] = metric_type if binary_index["index_type"] == "BIN_IVF_FLAT" and metric_type in ct.structure_metrics: error = {ct.err_code: 1, ct.err_msg: 'Invalid metric_type: SUBSTRUCTURE, ' - 'which does not match the index type: %s' % metric_type} + 'which does not match the index type: BIN_IVF_FLAT'} collection_w.create_index(ct.default_binary_vec_field_name, binary_index, check_task=CheckTasks.err_res, check_items=error) else: diff --git a/tests/python_client/testcases/test_compaction.py b/tests/python_client/testcases/test_compaction.py index 646a94a1571f2..ae33a01526556 100644 --- a/tests/python_client/testcases/test_compaction.py +++ b/tests/python_client/testcases/test_compaction.py @@ -78,6 +78,8 @@ def test_compact_partition(self): # create collection with shard_num=1, and create partition collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), shards_num=1) partition_w = self.init_partition_wrap(collection_wrap=collection_w) + collection_w.create_index(ct.default_float_vec_field_name, ct.default_index) + log.debug(collection_w.index()) # insert flush twice for i in range(2): @@ -95,8 +97,15 @@ def test_compact_partition(self): target = c_plans.plans[0].target # verify queryNode load the compacted segments - collection_w.load() - segment_info = self.utility_wrap.get_query_segment_info(collection_w.name)[0] + cost = 30 + start = time() + while time() - start < cost: + collection_w.load() + segment_info = self.utility_wrap.get_query_segment_info(collection_w.name)[ + 0] + if len(segment_info) == 1: + break + sleep(1.0) assert target == segment_info[0].segmentID @pytest.mark.tags(CaseLabel.L2) @@ -318,7 +327,8 @@ def test_compact_max_time_interval(self): collection_w.load() replicas = collection_w.get_replicas()[0] replica_num = len(replicas.groups) - segment_info = self.utility_wrap.get_query_segment_info(collection_w.name)[0] + segment_info = self.utility_wrap.get_query_segment_info(collection_w.name)[ + 0] assert len(segment_info) == 1*replica_num @pytest.mark.skip(reason="TODO") @@ -381,7 +391,8 @@ def test_compact_both_delete_merge(self): check_items={"nq": ct.default_nq, "ids": ids, "limit": ct.default_limit}) - collection_w.query(f"{ct.default_int64_field_name} in {delete_ids}", check_task=CheckTasks.check_query_empty) + collection_w.query(f"{ct.default_int64_field_name} in {delete_ids}", + check_task=CheckTasks.check_query_empty) @pytest.mark.tags(CaseLabel.L3) def test_compact_delete_multi_segments(self): @@ -415,7 +426,8 @@ def test_compact_delete_multi_segments(self): for plan in c_plans.plans: assert len(plan.sources) == 1 - collection_w.query(f"{ct.default_int64_field_name} in {delete_ids}", check_task=CheckTasks.check_query_empty) + collection_w.query(f"{ct.default_int64_field_name} in {delete_ids}", + check_task=CheckTasks.check_query_empty) @pytest.mark.tags(CaseLabel.L2) def test_compact_merge_multi_shards(self): @@ -516,11 +528,12 @@ def test_compact_after_binary_index(self): c_plans = collection_w.get_compaction_plans(check_task=CheckTasks.check_merge_compact)[0] # waiting for handoff completed and search - cost = 30 + cost = 60 start = time() while True: sleep(5) - segment_info = self.utility_wrap.get_query_segment_info(collection_w.name)[0] + segment_info = self.utility_wrap.get_query_segment_info(collection_w.name)[ + 0] if len(segment_info) != 0 and segment_info[0].segmentID == c_plans.plans[0].target: log.debug(segment_info) break @@ -779,6 +792,8 @@ def test_compact_merge_two_segments(self): num_of_segment = 2 # create collection shard_num=1, insert 2 segments, each with tmp_nb entities collection_w = self.collection_insert_multi_segments_one_shard(prefix, num_of_segment, tmp_nb) + collection_w.create_index(ct.default_float_vec_field_name, ct.default_index) + log.debug(collection_w.index()) collection_w.compact() collection_w.wait_for_compaction_completed() @@ -789,8 +804,21 @@ def test_compact_merge_two_segments(self): # verify queryNode load the compacted segments collection_w.load() - segment_info = self.utility_wrap.get_query_segment_info(collection_w.name)[0] - assert c_plans.plans[0].target == segment_info[0].segmentID + + start = time() + cost = 60 + while True: + sleep(5) + segments_info = self.utility_wrap.get_query_segment_info(collection_w.name)[ + 0] + + # verify segments reaches threshold, auto-merge ten segments into one + if len(segments_info) == 1: + break + end = time() + if end - start > cost: + raise MilvusException(1, "Compact merge two segments more than 60s") + assert c_plans.plans[0].target == segments_info[0].segmentID @pytest.mark.tags(CaseLabel.L2) def test_compact_no_merge(self): @@ -837,6 +865,8 @@ def test_compact_manual_and_auto(self): # create collection shard_num=1, insert 11 segments, each with one entity collection_w = self.collection_insert_multi_segments_one_shard(prefix, num_of_segment=num_of_segment) + collection_w.create_index(ct.default_float_vec_field_name, ct.default_index) + log.debug(collection_w.index()) # waiting for auto compaction finished sleep(60) @@ -846,8 +876,19 @@ def test_compact_manual_and_auto(self): c_plans, _ = collection_w.get_compaction_plans(check_task=CheckTasks.check_merge_compact, check_items={"segment_num": 2}) collection_w.load() - segments_info = self.utility_wrap.get_query_segment_info(collection_w.name)[0] - assert len(segments_info) == 1 + start = time() + cost = 60 + while True: + sleep(5) + segments_info = self.utility_wrap.get_query_segment_info(collection_w.name)[ + 0] + + # verify segments reaches threshold, auto-merge ten segments into one + if len(segments_info) == 1: + break + end = time() + if end - start > cost: + raise MilvusException(1, "Compact auto and manual more than 60s") assert segments_info[0].segmentID == c_plans.plans[0].target @pytest.mark.tags(CaseLabel.L1) @@ -865,6 +906,8 @@ def test_compact_merge_multi_segments(self): # create collection shard_num=1, insert 11 segments, each with one entity collection_w = self.collection_insert_multi_segments_one_shard(prefix, num_of_segment=num_of_segment) + collection_w.create_index(ct.default_float_vec_field_name, ct.default_index) + log.debug(collection_w.index()) collection_w.compact() collection_w.wait_for_compaction_completed() @@ -874,9 +917,21 @@ def test_compact_merge_multi_segments(self): target = c_plans.plans[0].target collection_w.load() + cost = 60 + start = time() + while True: + sleep(5) + segments_info = self.utility_wrap.get_query_segment_info(collection_w.name)[ + 0] + + # verify segments reaches threshold, auto-merge ten segments into one + if len(segments_info) == 1: + break + end = time() + if end - start > cost: + raise MilvusException(1, "Compact merge multiple segments more than 60s") replicas = collection_w.get_replicas()[0] replica_num = len(replicas.groups) - segments_info = self.utility_wrap.get_query_segment_info(collection_w.name)[0] assert len(segments_info) == 1*replica_num assert segments_info[0].segmentID == target @@ -890,6 +945,8 @@ def test_compact_merge_inside_time_travel(self): from pymilvus import utility # create collection shard_num=1, insert 2 segments, each with tmp_nb entities collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), shards_num=1) + collection_w.create_index(ct.default_float_vec_field_name, ct.default_index) + log.debug(collection_w.index()) # insert twice df1 = cf.gen_default_dataframe_data(tmp_nb) @@ -927,6 +984,8 @@ def test_compact_threshold_auto_merge(self): # create collection shard_num=1, insert 10 segments, each with one entity collection_w = self.collection_insert_multi_segments_one_shard(prefix, num_of_segment=threshold) + collection_w.create_index(ct.default_float_vec_field_name, ct.default_index) + log.debug(collection_w.index()) # Estimated auto-merging takes 30s cost = 120 @@ -936,7 +995,8 @@ def test_compact_threshold_auto_merge(self): start = time() while True: sleep(5) - segments_info = self.utility_wrap.get_query_segment_info(collection_w.name)[0] + segments_info = self.utility_wrap.get_query_segment_info(collection_w.name)[ + 0] # verify segments reaches threshold, auto-merge ten segments into one if len(segments_info) == 1*replica_num: @@ -959,12 +1019,12 @@ def test_compact_less_threshold_no_merge(self): # create collection shard_num=1, insert 9 segments, each with one entity collection_w = self.collection_insert_multi_segments_one_shard(prefix, num_of_segment=less_threshold) - # load and verify no auto-merge collection_w.load() replicas = collection_w.get_replicas()[0] replica_num = len(replicas.groups) - segments_info = self.utility_wrap.get_query_segment_info(collection_w.name)[0] + segments_info = self.utility_wrap.get_query_segment_info(collection_w.name)[ + 0] assert len(segments_info) == less_threshold*replica_num @pytest.mark.skip(reason="Todo") @@ -1011,6 +1071,8 @@ def test_compact_and_delete(self): """ # init collection with one shard, insert into two segments collection_w = self.collection_insert_multi_segments_one_shard(prefix, is_dup=False) + collection_w.create_index(ct.default_float_vec_field_name, ct.default_index) + log.debug(collection_w.index()) # compact and complete collection_w.compact() collection_w.wait_for_compaction_completed() @@ -1023,7 +1085,8 @@ def test_compact_and_delete(self): collection_w.query(expr, check_task=CheckTasks.check_query_empty) expr_1 = f'{ct.default_int64_field_name} in {[1]}' - collection_w.query(expr_1, check_task=CheckTasks.check_query_results, check_items={'exp_res': [{'int64': 1}]}) + collection_w.query(expr_1, check_task=CheckTasks.check_query_results, check_items={ + 'exp_res': [{'int64': 1}]}) @pytest.mark.tags(CaseLabel.L1) def test_compact_cross_shards(self): @@ -1036,6 +1099,8 @@ def test_compact_cross_shards(self): """ # insert into two segments with two shard collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), shards_num=2) + collection_w.create_index(ct.default_float_vec_field_name, ct.default_index) + log.debug(collection_w.index()) df = cf.gen_default_dataframe_data(tmp_nb) collection_w.insert(df) assert collection_w.num_entities == tmp_nb @@ -1063,6 +1128,8 @@ def test_compact_delete_cross_shards(self): shards_num = 2 # insert into two segments with two shard collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), shards_num=shards_num) + collection_w.create_index(ct.default_float_vec_field_name, ct.default_index) + log.debug(collection_w.index()) df = cf.gen_default_dataframe_data(tmp_nb) collection_w.insert(df) expr = f"{ct.default_int64_field_name} in [0, 99]" @@ -1089,6 +1156,8 @@ def test_compact_cross_partition(self): # create collection and partition collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), shards_num=1) partition_w = self.init_partition_wrap(collection_wrap=collection_w) + collection_w.create_index(ct.default_float_vec_field_name, ct.default_index) + log.debug(collection_w.index()) # insert df = cf.gen_default_dataframe_data(tmp_nb) @@ -1118,6 +1187,8 @@ def test_compact_during_insert(self): """ collection_w = self.collection_insert_multi_segments_one_shard(prefix, nb_of_segment=ct.default_nb, is_dup=False) + collection_w.create_index(ct.default_float_vec_field_name, ct.default_index) + log.debug(collection_w.index()) df = cf.gen_default_dataframe_data() def do_flush(): @@ -1135,7 +1206,8 @@ def do_flush(): collection_w.load() replicas = collection_w.get_replicas()[0] replica_num = len(replicas.groups) - seg_info = self.utility_wrap.get_query_segment_info(collection_w.name)[0] + seg_info = self.utility_wrap.get_query_segment_info(collection_w.name)[ + 0] assert len(seg_info) == 2*replica_num @pytest.mark.tags(CaseLabel.L2) @@ -1163,7 +1235,8 @@ def do_index(): collection_w.load() replicas = collection_w.get_replicas()[0] replica_num = len(replicas.groups) - seg_info = self.utility_wrap.get_query_segment_info(collection_w.name)[0] + seg_info = self.utility_wrap.get_query_segment_info(collection_w.name)[ + 0] assert len(seg_info) == 1*replica_num @pytest.mark.tags(CaseLabel.L2) diff --git a/tests/python_client/testcases/test_delete.py b/tests/python_client/testcases/test_delete.py index f3571a9d1f6ff..7fd4b65ce1d51 100644 --- a/tests/python_client/testcases/test_delete.py +++ b/tests/python_client/testcases/test_delete.py @@ -11,7 +11,7 @@ prefix = "delete" half_nb = ct.default_nb // 2 -tmp_nb = 100 +tmp_nb = 3000 tmp_expr = f'{ct.default_int64_field_name} in {[0]}' query_res_tmp_expr = [{f'{ct.default_int64_field_name}': 0}] query_tmp_expr_str = [{f'{ct.default_string_field_name}': "0"}] @@ -43,7 +43,8 @@ def test_delete_entities(self, is_binary): expected: Query result is empty """ # init collection with default_nb default data - collection_w, _, _, ids = self.init_collection_general(prefix, insert_data=True, is_binary=is_binary)[0:4] + collection_w, _, _, ids = self.init_collection_general( + prefix, insert_data=True, is_binary=is_binary)[0:4] expr = f'{ct.default_int64_field_name} in {ids[:half_nb]}' # delete half of data @@ -63,14 +64,16 @@ def test_delete_without_connection(self): expected: raise exception """ # init collection with tmp_nb default data - collection_w = self.init_collection_general(prefix, nb=tmp_nb, insert_data=True)[0] + collection_w = self.init_collection_general( + prefix, nb=tmp_nb, insert_data=True)[0] # remove connection and delete self.connection_wrap.remove_connection(ct.default_alias) res_list, _ = self.connection_wrap.list_connections() assert ct.default_alias not in res_list error = {ct.err_code: 0, ct.err_msg: "should create connect first"} - collection_w.delete(expr=tmp_expr, check_task=CheckTasks.err_res, check_items=error) + collection_w.delete( + expr=tmp_expr, check_task=CheckTasks.err_res, check_items=error) # Not Milvus Exception @pytest.mark.tags(CaseLabel.L2) @@ -81,9 +84,11 @@ def test_delete_expr_none(self): expected: raise exception """ # init collection with tmp_nb default data - collection_w = self.init_collection_general(prefix, nb=tmp_nb, insert_data=True)[0] + collection_w = self.init_collection_general( + prefix, nb=tmp_nb, insert_data=True)[0] error = {ct.err_code: 0, ct.err_msg: "expr cannot be None"} - collection_w.delete(expr=None, check_task=CheckTasks.err_res, check_items=error) + collection_w.delete( + expr=None, check_task=CheckTasks.err_res, check_items=error) @pytest.mark.tags(CaseLabel.L2) @pytest.mark.parametrize("expr", [1, [], ()]) @@ -94,9 +99,11 @@ def test_delete_expr_non_string(self, expr): expected: raise exception """ # init collection with tmp_nb default data - collection_w = self.init_collection_general(prefix, nb=tmp_nb, insert_data=True)[0] + collection_w = self.init_collection_general( + prefix, nb=tmp_nb, insert_data=True)[0] error = {ct.err_code: 0, ct.err_msg: f"expr value {expr} is illegal"} - collection_w.delete(expr, check_task=CheckTasks.err_res, check_items=error) + collection_w.delete( + expr, check_task=CheckTasks.err_res, check_items=error) @pytest.mark.tags(CaseLabel.L2) @pytest.mark.parametrize("expr", ["12-s", "中文"]) @@ -107,9 +114,12 @@ def test_delete_invalid_expr_string(self, expr): expected: Raise exception """ # init collection with tmp_nb default data - collection_w = self.init_collection_general(prefix, nb=tmp_nb, insert_data=True)[0] - error = {ct.err_code: 1, ct.err_msg: f"failed to create expr plan, expr = {expr}"} - collection_w.delete(expr, check_task=CheckTasks.err_res, check_items=error) + collection_w = self.init_collection_general( + prefix, nb=tmp_nb, insert_data=True)[0] + error = {ct.err_code: 1, + ct.err_msg: f"failed to create expr plan, expr = {expr}"} + collection_w.delete( + expr, check_task=CheckTasks.err_res, check_items=error) @pytest.mark.tags(CaseLabel.L2) def test_delete_expr_empty_value(self): @@ -119,7 +129,8 @@ def test_delete_expr_empty_value(self): expected: assert num entities """ # init collection with tmp_nb default data - collection_w = self.init_collection_general(prefix, nb=tmp_nb, insert_data=True)[0] + collection_w = self.init_collection_general( + prefix, nb=tmp_nb, insert_data=True)[0] expr = f'{ct.default_int64_field_name} in {[]}' # delete empty entities @@ -133,7 +144,8 @@ def test_delete_expr_single(self): expected: Describe num entities by one """ # init collection with tmp_nb default data - collection_w = self.init_collection_general(prefix, nb=tmp_nb, insert_data=True)[0] + collection_w = self.init_collection_general( + prefix, nb=tmp_nb, insert_data=True)[0] expr = f'{ct.default_int64_field_name} in {[0]}' del_res, _ = collection_w.delete(expr) assert del_res.delete_count == 1 @@ -147,7 +159,8 @@ def test_delete_expr_all_values(self): expected: num entities unchanged and deleted data will not be queried """ # init collection with default_nb default data - collection_w, _, _, ids = self.init_collection_general(prefix, insert_data=True)[0:4] + collection_w, _, _, ids = self.init_collection_general( + prefix, insert_data=True)[0:4] expr = f'{ct.default_int64_field_name} in {ids}' del_res, _ = collection_w.delete(expr) @@ -165,7 +178,8 @@ def test_delete_not_existed_values(self): expected: No exception """ # init collection with tmp_nb default data - collection_w = self.init_collection_general(prefix, nb=tmp_nb, insert_data=True)[0] + collection_w = self.init_collection_general( + prefix, nb=tmp_nb, insert_data=True)[0] # No exception expr = f'{ct.default_int64_field_name} in {[tmp_nb]}' @@ -181,7 +195,8 @@ def test_delete_part_not_existed_values(self): expected: delete existed id, ignore non-existed id """ # init collection with tmp_nb default data - collection_w = self.init_collection_general(prefix, nb=tmp_nb, insert_data=True)[0] + collection_w = self.init_collection_general( + prefix, nb=tmp_nb, insert_data=True)[0] expr = f'{ct.default_int64_field_name} in {[0, tmp_nb]}' collection_w.delete(expr=expr)[0] collection_w.query(expr, check_task=CheckTasks.check_query_empty) @@ -194,12 +209,14 @@ def test_delete_expr_inconsistent_values(self): expected: raise exception """ # init collection with tmp_nb default data - collection_w = self.init_collection_general(prefix, nb=tmp_nb, insert_data=True)[0] + collection_w = self.init_collection_general( + prefix, nb=tmp_nb, insert_data=True)[0] expr = f'{ct.default_int64_field_name} in {[0.0, 1.0]}' # Bad exception message error = {ct.err_code: 1, ct.err_msg: "failed to create expr plan,"} - collection_w.delete(expr=expr, check_task=CheckTasks.err_res, check_items=error) + collection_w.delete( + expr=expr, check_task=CheckTasks.err_res, check_items=error) @pytest.mark.tags(CaseLabel.L2) def test_delete_expr_mix_values(self): @@ -209,12 +226,14 @@ def test_delete_expr_mix_values(self): expected: raise exception """ # init collection with tmp_nb default data - collection_w = self.init_collection_general(prefix, nb=tmp_nb, insert_data=True)[0] + collection_w = self.init_collection_general( + prefix, nb=tmp_nb, insert_data=True)[0] expr = f'{ct.default_int64_field_name} in {[0, 1.0]}' # Bad exception message error = {ct.err_code: 1, ct.err_msg: "failed to create expr plan"} - collection_w.delete(expr=expr, check_task=CheckTasks.err_res, check_items=error) + collection_w.delete( + expr=expr, check_task=CheckTasks.err_res, check_items=error) @pytest.mark.tags(CaseLabel.L0) def test_delete_partition(self): @@ -224,7 +243,8 @@ def test_delete_partition(self): expected: verify partition entities are deleted """ # init collection and partition - collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix)) + collection_w = self.init_collection_wrap( + name=cf.gen_unique_str(prefix)) partition_w = self.init_partition_wrap(collection_wrap=collection_w) # load collection and insert data to partition @@ -233,11 +253,13 @@ def test_delete_partition(self): partition_w.insert(df) # delete ids from partition - del_res, _ = collection_w.delete(tmp_expr, partition_name=partition_w.name) + del_res, _ = collection_w.delete( + tmp_expr, partition_name=partition_w.name) assert del_res.delete_count == 1 # query with deleted id and query with existed id - collection_w.query(tmp_expr, check_task=CheckTasks.check_query_empty, partition_names=[partition_w.name]) + collection_w.query( + tmp_expr, check_task=CheckTasks.check_query_empty, partition_names=[partition_w.name]) res = df.iloc[1:2, :1].to_dict('records') collection_w.query(f'{ct.default_int64_field_name} in [1]', check_task=CheckTasks.check_query_results, check_items={exp_res: res}) @@ -250,8 +272,10 @@ def test_delete_default_partition(self): expected: assert delete successfully """ # create, insert with flush, load collection - collection_w = self.init_collection_general(prefix, nb=tmp_nb, insert_data=True)[0] - del_res, _ = collection_w.delete(tmp_expr, partition_name=ct.default_partition_name) + collection_w = self.init_collection_general( + prefix, nb=tmp_nb, insert_data=True)[0] + del_res, _ = collection_w.delete( + tmp_expr, partition_name=ct.default_partition_name) assert del_res.delete_count == 1 collection_w.num_entities collection_w.query(tmp_expr, check_task=CheckTasks.check_query_empty) @@ -265,10 +289,13 @@ def test_delete_non_string_partition_name(self, partition_name): expected: Raise exception """ # create, insert with flush, load collection - collection_w = self.init_collection_general(prefix, nb=tmp_nb, insert_data=True)[0] + collection_w = self.init_collection_general( + prefix, nb=tmp_nb, insert_data=True)[0] - error = {ct.err_code: 0, ct.err_msg: f"partition_name value {partition_name} is illegal"} - collection_w.delete(tmp_expr, partition_name=partition_name, check_task=CheckTasks.err_res, check_items=error) + error = {ct.err_code: 0, + ct.err_msg: f"partition_name value {partition_name} is illegal"} + collection_w.delete(tmp_expr, partition_name=partition_name, + check_task=CheckTasks.err_res, check_items=error) class TestDeleteOperation(TestcaseBase): @@ -298,7 +325,8 @@ def test_delete_entities_repeatedly(self): expected: No exception for second deletion """ # init collection with nb default data - collection_w = self.init_collection_general(prefix, nb=tmp_nb, insert_data=True)[0] + collection_w = self.init_collection_general( + prefix, nb=tmp_nb, insert_data=True)[0] # assert delete successfully and no exception collection_w.delete(expr=tmp_expr) @@ -317,11 +345,14 @@ def test_delete_after_index(self): expected: assert index and deleted id not in search result """ # create collection, insert tmp_nb, flush and load - collection_w, vectors = self.init_collection_general(prefix, insert_data=True)[0:2] + collection_w, vectors = self.init_collection_general( + prefix, insert_data=True)[0:2] # create index - index_params = {"index_type": "IVF_SQ8", "metric_type": "L2", "params": {"nlist": 64}} - collection_w.create_index(ct.default_float_vec_field_name, index_params) + index_params = {"index_type": "IVF_SQ8", + "metric_type": "L2", "params": {"nlist": 64}} + collection_w.create_index( + ct.default_float_vec_field_name, index_params) assert collection_w.has_index()[0] collection_w.release() collection_w.load() @@ -347,7 +378,8 @@ def test_delete_and_index(self): expected: Empty search result """ # init collection and insert data without flush - collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix)) + collection_w = self.init_collection_wrap( + name=cf.gen_unique_str(prefix)) df = cf.gen_default_dataframe_data() insert_res, _ = collection_w.insert(df) @@ -357,8 +389,10 @@ def test_delete_and_index(self): assert collection_w.num_entities == ct.default_nb # create index - index_params = {"index_type": "IVF_SQ8", "metric_type": "L2", "params": {"nlist": 64}} - collection_w.create_index(ct.default_float_vec_field_name, index_params) + index_params = {"index_type": "IVF_SQ8", + "metric_type": "L2", "params": {"nlist": 64}} + collection_w.create_index( + ct.default_float_vec_field_name, index_params) assert collection_w.has_index()[0] collection_w.load() @@ -367,7 +401,8 @@ def test_delete_and_index(self): ct.default_search_params, ct.default_limit) log.debug(search_res[0].ids) # assert search results not contains deleted ids - inter = set(insert_res.primary_keys[:ct.default_nb // 2]).intersection(set(search_res[0].ids)) + inter = set( + insert_res.primary_keys[:ct.default_nb // 2]).intersection(set(search_res[0].ids)) log.debug(inter) assert len(inter) == 0 @@ -383,7 +418,8 @@ def test_delete_query_ids_both_sealed_and_channel(self): expected: Empty query result """ # init collection and insert data without flush - collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix)) + collection_w = self.init_collection_wrap( + name=cf.gen_unique_str(prefix)) df = cf.gen_default_dataframe_data(tmp_nb) collection_w.insert(df) @@ -399,7 +435,8 @@ def test_delete_query_ids_both_sealed_and_channel(self): # insert id tmp_nb and delete id 0 and tmp_nb df_new = cf.gen_default_dataframe_data(nb=1, start=tmp_nb) collection_w.insert(df_new) - collection_w.delete(expr=f'{ct.default_int64_field_name} in {[tmp_nb]}') + collection_w.delete( + expr=f'{ct.default_int64_field_name} in {[tmp_nb]}') # query with id 0 and tmp_nb collection_w.query(expr=f'{ct.default_int64_field_name} in {[0, tmp_nb]}', @@ -413,7 +450,8 @@ def test_delete_search(self): expected: deleted entity is not in the search result """ # init collection with nb default data - collection_w, _, _, ids = self.init_collection_general(prefix, insert_data=True)[0:4] + collection_w, _, _, ids = self.init_collection_general( + prefix, insert_data=True)[0:4] entity, _ = collection_w.query(tmp_expr, output_fields=["%"]) search_res, _ = collection_w.search([entity[0][ct.default_float_vec_field_name]], ct.default_float_vec_field_name, @@ -428,7 +466,8 @@ def test_delete_search(self): ct.default_search_params, ct.default_limit) # assert search result is not equal to entity log.debug(f"Second search result ids: {search_res_2[0].ids}") - inter = set(ids[:ct.default_nb // 2]).intersection(set(search_res_2[0].ids)) + inter = set(ids[:ct.default_nb // 2] + ).intersection(set(search_res_2[0].ids)) # Using bounded staleness, we could still search the "deleted" entities, # since the search requests arrived query nodes earlier than query nodes consume the delete requests. assert len(inter) == 0 @@ -442,7 +481,8 @@ def test_delete_expr_repeated_values(self): expected: delete one entity """ # init collection with nb default data - collection_w = self.init_collection_general(prefix, nb=tmp_nb, insert_data=True)[0] + collection_w = self.init_collection_general( + prefix, nb=tmp_nb, insert_data=True)[0] expr = f'{ct.default_int64_field_name} in {[0, 0, 0]}' del_res, _ = collection_w.delete(expr) assert del_res.delete_count == 3 @@ -458,7 +498,8 @@ def test_delete_duplicate_primary_keys(self): expected: currently only delete one entity, query get one entity todo delete all entities """ - collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix)) + collection_w = self.init_collection_wrap( + name=cf.gen_unique_str(prefix)) df = cf.gen_default_dataframe_data(nb=tmp_nb) df[ct.default_int64_field_name] = 0 collection_w.insert(df) @@ -484,7 +525,8 @@ def test_delete_empty_partition(self): expected: No exception """ # init collection and partition - collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix)) + collection_w = self.init_collection_wrap( + name=cf.gen_unique_str(prefix)) partition_w = self.init_partition_wrap(collection_wrap=collection_w) collection_w.delete(tmp_expr, partition_name=partition_w.name) @@ -497,11 +539,14 @@ def test_delete_not_existed_partition(self): expected: raise exception """ # init collection with tmp_nb data - collection_w = self.init_collection_general(prefix, nb=tmp_nb, insert_data=True)[0] + collection_w = self.init_collection_general( + prefix, nb=tmp_nb, insert_data=True)[0] # raise exception - error = {ct.err_code: 1, ct.err_msg: f"partitionID of partitionName:{ct.default_tag} can not be find"} - collection_w.delete(tmp_expr, partition_name=ct.default_tag, check_task=CheckTasks.err_res, check_items=error) + error = {ct.err_code: 1, + ct.err_msg: f"partitionID of partitionName:{ct.default_tag} can not be find"} + collection_w.delete(tmp_expr, partition_name=ct.default_tag, + check_task=CheckTasks.err_res, check_items=error) @pytest.mark.tags(CaseLabel.L1) def test_delete_from_partition_with_another_ids(self): @@ -514,12 +559,14 @@ def test_delete_from_partition_with_another_ids(self): """ half = tmp_nb // 2 # create, insert, flush, load - collection_w, partition_w, _, _ = self.insert_entities_into_two_partitions_in_half(half) + collection_w, partition_w, _, _ = self.insert_entities_into_two_partitions_in_half( + half) # delete entities from another partition expr = f'{ct.default_int64_field_name} in {[0]}' collection_w.delete(expr, partition_name=ct.default_partition_name) - collection_w.query(expr, check_task=CheckTasks.check_query_results, check_items={exp_res: query_res_tmp_expr}) + collection_w.query(expr, check_task=CheckTasks.check_query_results, check_items={ + exp_res: query_res_tmp_expr}) # delete entities from own partition collection_w.delete(expr, partition_name=partition_w.name) @@ -534,7 +581,8 @@ def test_delete_from_partitions_with_same_ids(self): expected: The data only in partition_1 will be deleted """ # init collection and partition - collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix)) + collection_w = self.init_collection_wrap( + name=cf.gen_unique_str(prefix)) partition_w = self.init_partition_wrap(collection_wrap=collection_w) # insert same data into partition_w and default partition @@ -561,7 +609,8 @@ def test_delete_auto_id_collection(self): expected: versify delete successfully """ # init an auto_id collection and insert tmp_nb data - collection_w, _, _, ids = self.init_collection_general(prefix, nb=tmp_nb, insert_data=True, auto_id=True)[0:4] + collection_w, _, _, ids = self.init_collection_general( + prefix, nb=tmp_nb, insert_data=True, auto_id=True)[0:4] # delete with insert ids expr = f'{ct.default_int64_field_name} in {[ids[0]]}' @@ -581,7 +630,8 @@ def test_delete_query_without_loading(self): expected: Raise exception """ # create collection, insert data without flush - collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix)) + collection_w = self.init_collection_wrap( + name=cf.gen_unique_str(prefix)) df = cf.gen_default_dataframe_data(tmp_nb) collection_w.insert(df) assert collection_w.num_entities == tmp_nb @@ -591,8 +641,10 @@ def test_delete_query_without_loading(self): assert res.delete_count == 1 # query without loading and raise exception - error = {ct.err_code: 1, ct.err_msg: f"collection {collection_w.name} was not loaded into memory"} - collection_w.query(expr=tmp_expr, check_task=CheckTasks.err_res, check_items=error) + error = {ct.err_code: 1, + ct.err_msg: f"collection {collection_w.name} was not loaded into memory"} + collection_w.query( + expr=tmp_expr, check_task=CheckTasks.err_res, check_items=error) @pytest.mark.tags(CaseLabel.L1) def test_delete_sealed_segment_without_flush(self): @@ -604,7 +656,8 @@ def test_delete_sealed_segment_without_flush(self): expected: No query result """ # create collection, insert data without flush - collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix)) + collection_w = self.init_collection_wrap( + name=cf.gen_unique_str(prefix)) df = cf.gen_default_dataframe_data(tmp_nb) collection_w.insert(df) assert collection_w.num_entities == tmp_nb @@ -628,7 +681,8 @@ def test_delete_growing_data_channel_delete(self): expected: No query result """ # create collection - collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix)) + collection_w = self.init_collection_wrap( + name=cf.gen_unique_str(prefix)) # load collection and the queryNode watch the insertChannel collection_w.load() # insert data @@ -651,7 +705,8 @@ def test_delete_sealed_data_channel_delete(self): expected: Delete successfully and no query result """ # create collection and insert flush data - collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix)) + collection_w = self.init_collection_wrap( + name=cf.gen_unique_str(prefix)) df = cf.gen_default_dataframe_data(tmp_nb) collection_w.insert(df) assert collection_w.num_entities == tmp_nb @@ -674,7 +729,8 @@ def test_delete_sealed_segment_with_flush(self): expected: No query result """ # create collection - collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix)) + collection_w = self.init_collection_wrap( + name=cf.gen_unique_str(prefix)) # insert and flush data df = cf.gen_default_dataframe_data(tmp_nb) collection_w.insert(df) @@ -704,7 +760,8 @@ def test_delete_sealed_segment_with_twice_flush(self): expected: No query result """ # create collection - collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix)) + collection_w = self.init_collection_wrap( + name=cf.gen_unique_str(prefix)) # insert and flush data df = cf.gen_default_dataframe_data(tmp_nb) collection_w.insert(df) @@ -729,7 +786,8 @@ def test_delete_sealed_data_sealed_delete(self): expected: Empty query result """ # create collection - collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix)) + collection_w = self.init_collection_wrap( + name=cf.gen_unique_str(prefix)) # insert without flush df = cf.gen_default_dataframe_data(tmp_nb) collection_w.insert(df) @@ -757,7 +815,8 @@ def test_delete_insert_same_id_growing(self, to_query, to_flush): expected: Verify that the query gets the newly inserted entity """ # init collection and insert data without flush - collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix)) + collection_w = self.init_collection_wrap( + name=cf.gen_unique_str(prefix)) collection_w.load() df = cf.gen_default_dataframe_data(tmp_nb) collection_w.insert(df) @@ -766,7 +825,8 @@ def test_delete_insert_same_id_growing(self, to_query, to_flush): del_res, _ = collection_w.delete(tmp_expr) log.debug(f'to_query:{to_query}') if to_query: - collection_w.query(tmp_expr, check_task=CheckTasks.check_query_empty) + collection_w.query( + tmp_expr, check_task=CheckTasks.check_query_empty) # insert entity with primary key 0 df_new = cf.gen_default_dataframe_data(1) @@ -797,7 +857,8 @@ def test_delete_insert_same_id_sealed(self, to_query): expected: Verify that the query gets the newly inserted entity """ # init collection and insert data without flush - collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix)) + collection_w = self.init_collection_wrap( + name=cf.gen_unique_str(prefix)) # insert df = cf.gen_default_dataframe_data(1000) @@ -809,12 +870,14 @@ def test_delete_insert_same_id_sealed(self, to_query): res = df.iloc[:1, :1].to_dict('records') collection_w.search(data=[df[ct.default_float_vec_field_name][0]], anns_field=ct.default_float_vec_field_name, param=default_search_params, limit=1) - collection_w.query(tmp_expr, check_task=CheckTasks.check_query_results, check_items={'exp_res': res}) + collection_w.query( + tmp_expr, check_task=CheckTasks.check_query_results, check_items={'exp_res': res}) # delete collection_w.delete(tmp_expr) if to_query: - collection_w.query(tmp_expr, check_task=CheckTasks.check_query_empty) + collection_w.query( + tmp_expr, check_task=CheckTasks.check_query_empty) # re-insert df_new = cf.gen_default_dataframe_data(nb=1) @@ -838,7 +901,8 @@ def test_delete_entity_loop(self): expected: No exception """ # init an auto_id collection and insert tmp_nb data, flush and load - collection_w, _, _, ids = self.init_collection_general(prefix, nb=tmp_nb, insert_data=True, auto_id=True)[0:4] + collection_w, _, _, ids = self.init_collection_general( + prefix, nb=tmp_nb, insert_data=True, auto_id=True)[0:4] for del_id in ids: expr = f'{ct.default_int64_field_name} in {[del_id]}' @@ -857,7 +921,8 @@ def test_delete_flush_loop(self): expected: No exception """ # init an auto_id collection and insert tmp_nb data - collection_w, _, _, ids = self.init_collection_general(prefix, nb=tmp_nb, insert_data=True, auto_id=True)[0:4] + collection_w, _, _, ids = self.init_collection_general( + prefix, nb=tmp_nb, insert_data=True, auto_id=True)[0:4] batch = 10 for i in range(tmp_nb // batch): @@ -885,7 +950,8 @@ def test_delete_merge_same_id_channel_and_sealed(self, to_flush_data, to_flush_d expected: Empty query result """ # init collection and insert data without flush - collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), shards_num=1) + collection_w = self.init_collection_wrap( + name=cf.gen_unique_str(prefix), shards_num=1) df = cf.gen_default_dataframe_data(tmp_nb) collection_w.insert(df) @@ -924,7 +990,8 @@ def test_delete_merge_ids_channel_and_sealed(self): expected: Empty query result """ # init collection and insert data without flush - collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), shards_num=1) + collection_w = self.init_collection_wrap( + name=cf.gen_unique_str(prefix), shards_num=1) df = cf.gen_default_dataframe_data(tmp_nb) collection_w.insert(df) @@ -953,7 +1020,12 @@ def test_delete_query_after_handoff(self): expected: Delete successfully, query get empty result """ # init collection and load - collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), shards_num=1) + collection_w = self.init_collection_wrap( + name=cf.gen_unique_str(prefix), shards_num=1) + index_params = {"index_type": "IVF_SQ8", + "metric_type": "L2", "params": {"nlist": 64}} + collection_w.create_index( + ct.default_float_vec_field_name, index_params) collection_w.load() # insert data and delete id 0 @@ -967,7 +1039,8 @@ def test_delete_query_after_handoff(self): # wait for the handoff to complete while True: time.sleep(0.5) - segment_infos = self.utility_wrap.get_query_segment_info(collection_w.name)[0] + segment_infos = self.utility_wrap.get_query_segment_info(collection_w.name)[ + 0] if len(segment_infos) > 0 and segment_infos[0].state == SegmentState.Sealed: break # query deleted id @@ -998,7 +1071,8 @@ def test_delete_time_travel(self): insert_res, _ = collection_w.insert(df) collection_w.load() - tt = self.utility_wrap.mkts_from_hybridts(insert_res.timestamp, milliseconds=0.) + tt = self.utility_wrap.mkts_from_hybridts( + insert_res.timestamp, milliseconds=0.) res_before, _ = collection_w.search(df[ct.default_float_vec_field_name][:1].to_list(), ct.default_float_vec_field_name, @@ -1025,7 +1099,8 @@ def test_delete_insert_multi(self): expected: Verify result """ # create collection, insert multi times, each with tmp_nb entities - collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix)) + collection_w = self.init_collection_wrap( + name=cf.gen_unique_str(prefix)) multi = 3 for i in range(multi): start = i * tmp_nb @@ -1057,13 +1132,15 @@ def test_delete_sealed_only(self): expected: """ # init collection and insert data without flush - collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), shards_num=2) + collection_w = self.init_collection_wrap( + name=cf.gen_unique_str(prefix), shards_num=2) # insert 3000 entities into 3 segments segment_num = 3 segment_per_count = 2000 ids = [] for i in range(segment_num): - df = cf.gen_default_dataframe_data(nb=segment_per_count, start=(i * segment_per_count)) + df = cf.gen_default_dataframe_data( + nb=segment_per_count, start=(i * segment_per_count)) res, _ = collection_w.insert(df) assert collection_w.num_entities == (i + 1) * segment_per_count ids.extend(res.primary_keys) @@ -1090,12 +1167,14 @@ def test_delete_entities_repeatedly_with_string(self): """ # init collection with nb default data collection_w = \ - self.init_collection_general(prefix, nb=tmp_nb, insert_data=True, primary_field=ct.default_string_field_name)[0] + self.init_collection_general( + prefix, nb=tmp_nb, insert_data=True, primary_field=ct.default_string_field_name)[0] # assert delete successfully and no exception collection_w.delete(expr=default_string_expr) collection_w.num_entities - collection_w.query(default_string_expr, check_task=CheckTasks.check_query_empty) + collection_w.query(default_string_expr, + check_task=CheckTasks.check_query_empty) collection_w.delete(expr=default_string_expr) @pytest.mark.tags(CaseLabel.L1) @@ -1112,17 +1191,21 @@ def test_delete_all_index_with_string(self): primary_field=ct.default_string_field_name)[0:2] # create index - index_params_one = {"index_type": "IVF_SQ8", "metric_type": "L2", "params": {"nlist": 64}} - collection_w.create_index(ct.default_float_vec_field_name, index_params_one, index_name=index_name1) + index_params_one = {"index_type": "IVF_SQ8", + "metric_type": "L2", "params": {"nlist": 64}} + collection_w.create_index( + ct.default_float_vec_field_name, index_params_one, index_name=index_name1) index_params_two = {} - collection_w.create_index(ct.default_string_field_name, index_params=index_params_two, index_name=index_name2) + collection_w.create_index( + ct.default_string_field_name, index_params=index_params_two, index_name=index_name2) assert collection_w.has_index(index_name=index_name2) collection_w.release() collection_w.load() # delete entity collection_w.delete(default_string_expr) - collection_w.query(default_string_expr, check_task=CheckTasks.check_query_empty) + collection_w.query(default_string_expr, + check_task=CheckTasks.check_query_empty) assert collection_w.has_index(index_name=index_name2) # search with id 0 vectors @@ -1143,7 +1226,8 @@ def test_delete_and_index_with_string(self): """ # init collection and insert data without flush schema = cf.gen_string_pk_default_collection_schema() - collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), schema=schema) + collection_w = self.init_collection_wrap( + name=cf.gen_unique_str(prefix), schema=schema) df = cf.gen_default_dataframe_data() insert_res, _ = collection_w.insert(df) @@ -1154,8 +1238,10 @@ def test_delete_and_index_with_string(self): assert collection_w.num_entities == ct.default_nb # create index - index_params = {"index_type": "IVF_SQ8", "metric_type": "L2", "params": {"nlist": 64}} - collection_w.create_index(ct.default_float_vec_field_name, index_params) + index_params = {"index_type": "IVF_SQ8", + "metric_type": "L2", "params": {"nlist": 64}} + collection_w.create_index( + ct.default_float_vec_field_name, index_params) assert collection_w.has_index()[0] collection_w.load() @@ -1164,7 +1250,8 @@ def test_delete_and_index_with_string(self): ct.default_search_params, ct.default_limit) log.debug(search_res[0].ids) # assert search results not contains deleted ids - inter = set(insert_res.primary_keys[:ct.default_nb // 2]).intersection(set(search_res[0].ids)) + inter = set( + insert_res.primary_keys[:ct.default_nb // 2]).intersection(set(search_res[0].ids)) log.debug(inter) assert len(inter) == 0 @@ -1181,7 +1268,8 @@ def test_delete_query_ids_both_sealed_and_channel_with_string(self): """ # init collection and insert data without flush schema = cf.gen_string_pk_default_collection_schema() - collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), schema=schema) + collection_w = self.init_collection_wrap( + name=cf.gen_unique_str(prefix), schema=schema) df = cf.gen_default_dataframe_data(tmp_nb) collection_w.insert(df) @@ -1192,12 +1280,14 @@ def test_delete_query_ids_both_sealed_and_channel_with_string(self): # load and query id 0 collection_w.load() - collection_w.query(default_string_expr, check_task=CheckTasks.check_query_empty) + collection_w.query(default_string_expr, + check_task=CheckTasks.check_query_empty) # insert id tmp_nb and delete id 0 and tmp_nb df_new = cf.gen_default_dataframe_data(nb=1, start=tmp_nb) collection_w.insert(df_new) - collection_w.delete(expr=f'{ct.default_string_field_name} in ["tmp_nb"]') + collection_w.delete( + expr=f'{ct.default_string_field_name} in ["tmp_nb"]') # query with id 0 and tmp_nb collection_w.query(expr=f'{ct.default_string_field_name} in ["0", "tmp_nb"]', @@ -1213,7 +1303,8 @@ def test_delete_search_with_string(self): # init collection with nb default data collection_w, _, _, ids = self.init_collection_general(prefix, insert_data=True, primary_field=ct.default_string_field_name)[0:4] - entity, _ = collection_w.query(default_string_expr, output_fields=["%"]) + entity, _ = collection_w.query( + default_string_expr, output_fields=["%"]) search_res, _ = collection_w.search([entity[0][ct.default_float_vec_field_name]], ct.default_float_vec_field_name, ct.default_search_params, ct.default_limit) @@ -1228,7 +1319,8 @@ def test_delete_search_with_string(self): ct.default_search_params, ct.default_limit) # assert search result is not equal to entity log.debug(f"Second search result ids: {search_res_2[0].ids}") - inter = set(ids[:ct.default_nb // 2]).intersection(set(search_res_2[0].ids)) + inter = set(ids[:ct.default_nb // 2] + ).intersection(set(search_res_2[0].ids)) # Using bounded staleness, we could still search the "deleted" entities, # since the search requests arrived query nodes earlier than query nodes consume the delete requests. assert len(inter) == 0 @@ -1243,7 +1335,8 @@ def test_delete_expr_repeated_values_with_string(self): """ # init collection with nb default data collection_w = \ - self.init_collection_general(prefix, nb=tmp_nb, insert_data=True, primary_field=ct.default_string_field_name)[0] + self.init_collection_general( + prefix, nb=tmp_nb, insert_data=True, primary_field=ct.default_string_field_name)[0] expr = f'{ct.default_string_field_name} in ["0", "0", "0"]' del_res, _ = collection_w.delete(expr) assert del_res.delete_count == 3 @@ -1260,7 +1353,8 @@ def test_delete_duplicate_primary_keys_with_string(self): todo delete all entities """ schema = cf.gen_string_pk_default_collection_schema() - collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), schema=schema) + collection_w = self.init_collection_wrap( + name=cf.gen_unique_str(prefix), schema=schema) df = cf.gen_default_dataframe_data(nb=tmp_nb) df[ct.default_string_field_name] = "0" collection_w.insert(df) @@ -1289,7 +1383,8 @@ def test_delete_from_partitions_with_same_ids_of_string(self): """ # init collection and partition schema = cf.gen_string_pk_default_collection_schema() - collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), schema=schema) + collection_w = self.init_collection_wrap( + name=cf.gen_unique_str(prefix), schema=schema) partition_w = self.init_partition_wrap(collection_wrap=collection_w) # insert same data into partition_w and default partition @@ -1298,7 +1393,8 @@ def test_delete_from_partitions_with_same_ids_of_string(self): partition_w.insert(df) # delete same id 0 from default_partition, and query on it get empty result - collection_w.delete(default_string_expr, partition_name=ct.default_partition_name) + collection_w.delete(default_string_expr, + partition_name=ct.default_partition_name) assert collection_w.num_entities == tmp_nb * 2 collection_w.load() collection_w.query(default_string_expr, partition_names=[ct.default_partition_name], @@ -1319,7 +1415,8 @@ def test_delete_sealed_segment_without_flush_with_string(self): """ # create collection, insert data without flush schema = cf.gen_string_pk_default_collection_schema() - collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), schema=schema) + collection_w = self.init_collection_wrap( + name=cf.gen_unique_str(prefix), schema=schema) df = cf.gen_default_dataframe_data(tmp_nb) collection_w.insert(df) assert collection_w.num_entities == tmp_nb @@ -1330,7 +1427,8 @@ def test_delete_sealed_segment_without_flush_with_string(self): # load and query with id collection_w.load() - collection_w.query(default_string_expr, check_task=CheckTasks.check_query_empty) + collection_w.query(default_string_expr, + check_task=CheckTasks.check_query_empty) @pytest.mark.tags(CaseLabel.L1) def test_delete_growing_data_channel_delete_with_string(self): @@ -1344,7 +1442,8 @@ def test_delete_growing_data_channel_delete_with_string(self): """ # create collection schema = cf.gen_string_pk_default_collection_schema() - collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), schema=schema) + collection_w = self.init_collection_wrap( + name=cf.gen_unique_str(prefix), schema=schema) # load collection and the queryNode watch the insertChannel collection_w.load() # insert data @@ -1354,7 +1453,8 @@ def test_delete_growing_data_channel_delete_with_string(self): del_res = collection_w.delete(default_string_expr)[0] assert del_res.delete_count == 1 # query id 0 - collection_w.query(default_string_expr, check_task=CheckTasks.check_query_empty) + collection_w.query(default_string_expr, + check_task=CheckTasks.check_query_empty) @pytest.mark.tags(CaseLabel.L1) def test_delete_sealed_data_channel_delete_with_string(self): @@ -1368,7 +1468,8 @@ def test_delete_sealed_data_channel_delete_with_string(self): """ # create collection and insert flush data schema = cf.gen_string_pk_default_collection_schema() - collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), schema=schema) + collection_w = self.init_collection_wrap( + name=cf.gen_unique_str(prefix), schema=schema) df = cf.gen_default_dataframe_data(tmp_nb) collection_w.insert(df) assert collection_w.num_entities == tmp_nb @@ -1378,7 +1479,8 @@ def test_delete_sealed_data_channel_delete_with_string(self): # delete ids and query collection_w.delete(default_string_expr) - collection_w.query(default_string_expr, check_task=CheckTasks.check_query_empty) + collection_w.query(default_string_expr, + check_task=CheckTasks.check_query_empty) @pytest.mark.tags(CaseLabel.L2) def test_delete_sealed_segment_with_flush_string(self): @@ -1392,7 +1494,8 @@ def test_delete_sealed_segment_with_flush_string(self): """ # create collection schema = cf.gen_string_pk_default_collection_schema() - collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), schema=schema) + collection_w = self.init_collection_wrap( + name=cf.gen_unique_str(prefix), schema=schema) # insert and flush data df = cf.gen_default_dataframe_data(tmp_nb) collection_w.insert(df) @@ -1405,7 +1508,8 @@ def test_delete_sealed_segment_with_flush_string(self): log.info(collection_w.num_entities) # load and query id 0 collection_w.load() - collection_w.query(default_string_expr, check_task=CheckTasks.check_query_empty) + collection_w.query(default_string_expr, + check_task=CheckTasks.check_query_empty) @pytest.mark.tags(CaseLabel.L1) def test_delete_sealed_data_sealed_delete_string(self): @@ -1418,7 +1522,8 @@ def test_delete_sealed_data_sealed_delete_string(self): """ # create collection schema = cf.gen_string_pk_default_collection_schema() - collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), schema=schema) + collection_w = self.init_collection_wrap( + name=cf.gen_unique_str(prefix), schema=schema) # insert without flush df = cf.gen_default_dataframe_data(tmp_nb) collection_w.insert(df) @@ -1430,7 +1535,8 @@ def test_delete_sealed_data_sealed_delete_string(self): # load and query id 0 collection_w.load() - collection_w.query(default_string_expr, check_task=CheckTasks.check_query_empty) + collection_w.query(default_string_expr, + check_task=CheckTasks.check_query_empty) @pytest.mark.tags(CaseLabel.L2) def test_delete_entity_loop_with_string(self): @@ -1494,7 +1600,8 @@ def test_delete_merge_same_id_channel_and_sealed_string(self, to_flush_data, to_ """ # init collection and insert data without flush schema = cf.gen_string_pk_default_collection_schema() - collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), schema=schema, shards_num=1) + collection_w = self.init_collection_wrap( + name=cf.gen_unique_str(prefix), schema=schema, shards_num=1) df = cf.gen_default_dataframe_data(tmp_nb) collection_w.insert(df) @@ -1505,7 +1612,8 @@ def test_delete_merge_same_id_channel_and_sealed_string(self, to_flush_data, to_ # load and query id 0 collection_w.load() - collection_w.query(default_string_expr, check_task=CheckTasks.check_query_empty) + collection_w.query(default_string_expr, + check_task=CheckTasks.check_query_empty) # insert new entity with same id 0 and query df_new = cf.gen_default_dataframe_data(1) @@ -1520,7 +1628,8 @@ def test_delete_merge_same_id_channel_and_sealed_string(self, to_flush_data, to_ collection_w.delete(default_string_expr) if to_flush_delete: log.debug(collection_w.num_entities) - collection_w.query(default_string_expr, check_task=CheckTasks.check_query_empty) + collection_w.query(default_string_expr, + check_task=CheckTasks.check_query_empty) @pytest.mark.tags(CaseLabel.L2) def test_delete_merge_ids_channel_and_sealed_string(self): @@ -1535,7 +1644,8 @@ def test_delete_merge_ids_channel_and_sealed_string(self): """ # init collection and insert data without flush schema = cf.gen_string_pk_default_collection_schema() - collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), schema=schema, shards_num=1) + collection_w = self.init_collection_wrap( + name=cf.gen_unique_str(prefix), schema=schema, shards_num=1) df = cf.gen_default_dataframe_data(tmp_nb) collection_w.insert(df) @@ -1546,7 +1656,8 @@ def test_delete_merge_ids_channel_and_sealed_string(self): # load and query id 0 collection_w.load() - collection_w.query(default_string_expr, check_task=CheckTasks.check_query_empty) + collection_w.query(default_string_expr, + check_task=CheckTasks.check_query_empty) # delete id 1 and query id 0 and 1 collection_w.delete(expr=f'{ct.default_string_field_name} in ["1"]') @@ -1565,7 +1676,12 @@ def test_delete_query_after_handoff_with_string(self): """ # init collection and load schema = cf.gen_string_pk_default_collection_schema() - collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), schema=schema, shards_num=1) + collection_w = self.init_collection_wrap( + name=cf.gen_unique_str(prefix), schema=schema, shards_num=1) + index_params = {"index_type": "IVF_SQ8", + "metric_type": "L2", "params": {"nlist": 64}} + collection_w.create_index( + ct.default_float_vec_field_name, index_params) collection_w.load() # insert data and delete id 0 @@ -1579,11 +1695,13 @@ def test_delete_query_after_handoff_with_string(self): # wait for the handoff to complete while True: time.sleep(0.5) - segment_infos = self.utility_wrap.get_query_segment_info(collection_w.name)[0] + segment_infos = self.utility_wrap.get_query_segment_info(collection_w.name)[ + 0] if len(segment_infos) > 0 and segment_infos[0].state == SegmentState.Sealed: break # query deleted id - collection_w.query(default_string_expr, check_task=CheckTasks.check_query_empty) + collection_w.query(default_string_expr, + check_task=CheckTasks.check_query_empty) @pytest.mark.tags(CaseLabel.L1) def test_delete_time_travel_string(self): @@ -1595,12 +1713,14 @@ def test_delete_time_travel_string(self): expected: search successfully """ schema = cf.gen_string_pk_default_collection_schema() - collection_w = self.init_collection_wrap(cf.gen_unique_str(prefix), schema=schema) + collection_w = self.init_collection_wrap( + cf.gen_unique_str(prefix), schema=schema) df = cf.gen_default_dataframe_data(tmp_nb) insert_res, _ = collection_w.insert(df) collection_w.load() - tt = self.utility_wrap.mkts_from_hybridts(insert_res.timestamp, milliseconds=0.) + tt = self.utility_wrap.mkts_from_hybridts( + insert_res.timestamp, milliseconds=0.) res_before, _ = collection_w.search(df[ct.default_float_vec_field_name][:1].to_list(), ct.default_float_vec_field_name, @@ -1629,7 +1749,8 @@ def test_delete_insert_multi_with_string(self): """ # create collection, insert multi times, each with tmp_nb entities schema = cf.gen_string_pk_default_collection_schema() - collection_w = self.init_collection_wrap(cf.gen_unique_str(prefix), schema=schema) + collection_w = self.init_collection_wrap( + cf.gen_unique_str(prefix), schema=schema) multi = 3 for i in range(multi): start = i * tmp_nb @@ -1660,11 +1781,14 @@ def test_delete_invalid_expr(self): expected: Raise exception """ collection_w = \ - self.init_collection_general(prefix, nb=tmp_nb, insert_data=True, primary_field=ct.default_string_field_name)[0] + self.init_collection_general( + prefix, nb=tmp_nb, insert_data=True, primary_field=ct.default_string_field_name)[0] collection_w.load() - error = {ct.err_code: 0, ct.err_msg: f"failed to create expr plan, expr = {default_invaild_string_exp}"} - collection_w.delete(expr=default_invaild_string_exp, check_task=CheckTasks.err_res, check_items=error) - + error = {ct.err_code: 0, + ct.err_msg: f"failed to create expr plan, expr = {default_invaild_string_exp}"} + collection_w.delete(expr=default_invaild_string_exp, + check_task=CheckTasks.err_res, check_items=error) + @pytest.mark.tags(CaseLabel.L1) @pytest.mark.parametrize("to_query", [True, False]) def test_delete_insert_same_id_sealed_string(self, to_query): @@ -1679,7 +1803,8 @@ def test_delete_insert_same_id_sealed_string(self, to_query): """ # init collection and insert data without flush schema = cf.gen_string_pk_default_collection_schema() - collection_w = self.init_collection_wrap(cf.gen_unique_str(prefix), schema=schema) + collection_w = self.init_collection_wrap( + cf.gen_unique_str(prefix), schema=schema) # insert df = cf.gen_default_dataframe_data(1000) @@ -1692,12 +1817,14 @@ def test_delete_insert_same_id_sealed_string(self, to_query): default_search_params = {"metric_type": "L2", "params": {"nprobe": 16}} collection_w.search(data=[df[ct.default_float_vec_field_name][0]], anns_field=ct.default_float_vec_field_name, param=default_search_params, limit=1) - collection_w.query(default_string_expr, check_task=CheckTasks.check_query_results, check_items={'exp_res': res}) + collection_w.query( + default_string_expr, check_task=CheckTasks.check_query_results, check_items={'exp_res': res}) # delete collection_w.delete(default_string_expr) if to_query: - collection_w.query(default_string_expr, check_task=CheckTasks.check_query_empty) + collection_w.query(default_string_expr, + check_task=CheckTasks.check_query_empty) # re-insert df_new = cf.gen_default_dataframe_data(nb=1) @@ -1707,6 +1834,6 @@ def test_delete_insert_same_id_sealed_string(self, to_query): # re-query res = df_new.iloc[[0], [2, 3]].to_dict('records') collection_w.query(default_string_expr, output_fields=[ct.default_float_vec_field_name], - check_task=CheckTasks.check_query_results, check_items={'exp_res': res , 'primary_field': ct.default_string_field_name, 'with_vec': True}) + check_task=CheckTasks.check_query_results, check_items={'exp_res': res, 'primary_field': ct.default_string_field_name, 'with_vec': True}) collection_w.search(data=[df_new[ct.default_float_vec_field_name][0]], anns_field=ct.default_float_vec_field_name, param=default_search_params, limit=1) diff --git a/tests/python_client/testcases/test_utility.py b/tests/python_client/testcases/test_utility.py index c3476397cf5d5..3f10841aba39f 100644 --- a/tests/python_client/testcases/test_utility.py +++ b/tests/python_client/testcases/test_utility.py @@ -1429,6 +1429,7 @@ def test_get_sealed_query_segment_info(self): 1. length of segment is greater than 0 2. the sum num_rows of each segment is equal to num of entities """ + pytest.skip("QueryCoord treat all segments without index as growing segments") c_name = cf.gen_unique_str(prefix) collection_w = self.init_collection_wrap(name=c_name) nb = 3000 @@ -1489,6 +1490,7 @@ def test_load_balance_normal(self): pytest.skip("skip load balance testcase when querynode number less than 2") c_name = cf.gen_unique_str(prefix) collection_w = self.init_collection_wrap(name=c_name) + collection_w.create_index(default_field_name, default_index_params) ms = MilvusSys() nb = 3000 df = cf.gen_default_dataframe_data(nb) @@ -1604,6 +1606,7 @@ def test_load_balance_with_one_sealed_segment_id_not_exist(self): # init a collection c_name = cf.gen_unique_str(prefix) collection_w = self.init_collection_wrap(name=c_name) + collection_w.create_index(default_field_name, default_index_params) ms = MilvusSys() nb = 3000 df = cf.gen_default_dataframe_data(nb) @@ -2269,7 +2272,6 @@ def test_revoke_public_role_privilege(self, host, port): self.utility_wrap.init_role("public") self.utility_wrap.role_grant("Collection", c_name, "Insert") - @pytest.mark.tags(CaseLabel.L3) def test_role_revoke_collection_privilege(self, host, port): """ @@ -3245,7 +3247,7 @@ def test_new_user_default_owns_public_role_permission(self, host, port): self.utility_wrap.reset_password(user=user_test, old_password=password_test, new_password=password, check_task=CheckTasks.check_permission_deny) self.utility_wrap.update_password(user=user_test, old_password=password, new_password=password_test, - check_task=CheckTasks.check_permission_deny) + check_task=CheckTasks.check_permission_deny) self.utility_wrap.list_user(user_test, False, check_task=CheckTasks.check_permission_deny) # public role access