diff --git a/internal/datacoord/compaction_trigger_v2.go b/internal/datacoord/compaction_trigger_v2.go index e387fdd628475..f3e05e141463d 100644 --- a/internal/datacoord/compaction_trigger_v2.go +++ b/internal/datacoord/compaction_trigger_v2.go @@ -148,6 +148,9 @@ func (m *CompactionTriggerManager) setL0Triggering(b bool) { m.l0SigLock.Lock() defer m.l0SigLock.Unlock() m.l0Triggering = b + if !b { + m.l0TickSig.Broadcast() + } } func (m *CompactionTriggerManager) startLoop() { @@ -179,6 +182,7 @@ func (m *CompactionTriggerManager) startLoop() { events, err := m.l0Policy.Trigger() if err != nil { log.Warn("Fail to trigger L0 policy", zap.Error(err)) + m.setL0Triggering(false) continue } ctx := context.Background() diff --git a/internal/datacoord/import_checker.go b/internal/datacoord/import_checker.go index 5701dc44ae1d2..981cbffe03c96 100644 --- a/internal/datacoord/import_checker.go +++ b/internal/datacoord/import_checker.go @@ -96,7 +96,7 @@ func (c *importChecker) Start() { log.Info("waiting for all channels to send signals", zap.Strings("vchannels", job.GetVchannels()), zap.Strings("readyVchannels", job.GetReadyVchannels()), - zap.Int64("jobID", job.GetJobID())) // TODO fubang + zap.Int64("jobID", job.GetJobID())) continue } switch job.GetState() { @@ -215,7 +215,6 @@ func (c *importChecker) checkPendingJob(job ImportJob) { log := log.With(zap.Int64("jobID", job.GetJobID())) lacks := c.getLackFilesForPreImports(job) if len(lacks) == 0 { - log.Info("import job start to preimport") // TODO fubang return } fileGroups := lo.Chunk(lacks, Params.DataCoordCfg.FilesPerPreImportTask.GetAsInt()) @@ -395,11 +394,17 @@ func (c *importChecker) checkIndexBuildingJob(job ImportJob) { } // wait l0 segment import and block l0 compaction + log.Info("start to pause l0 segment compacting", zap.Int64("jobID", job.GetJobID())) c.l0CompactionTrigger.PauseL0SegmentCompacting(job.GetCollectionID()) + log.Info("l0 segment compacting paused", zap.Int64("jobID", job.GetJobID())) defer c.l0CompactionTrigger.ResumeL0SegmentCompacting(job.GetCollectionID()) l0ImportTasks := c.imeta.GetTaskBy(context.TODO(), WithType(ImportTaskType), WithJob(job.GetJobID()), WithL0CompactionSource()) for _, t := range l0ImportTasks { if t.GetState() != datapb.ImportTaskStateV2_Completed { + log.Info("waiting for l0 import task...", + zap.Int64s("taskIDs", lo.Map(l0ImportTasks, func(t ImportTask, _ int) int64 { + return t.GetTaskID() + }))) return } } diff --git a/internal/datacoord/import_checker_test.go b/internal/datacoord/import_checker_test.go index d3d00d793031b..1bc50576ef601 100644 --- a/internal/datacoord/import_checker_test.go +++ b/internal/datacoord/import_checker_test.go @@ -37,6 +37,7 @@ import ( "github.com/milvus-io/milvus/pkg/proto/datapb" "github.com/milvus-io/milvus/pkg/proto/indexpb" "github.com/milvus-io/milvus/pkg/proto/internalpb" + "github.com/milvus-io/milvus/pkg/proto/rootcoordpb" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/timerecord" "github.com/milvus-io/milvus/pkg/util/tsoutil" @@ -528,7 +529,6 @@ func TestImportCheckerCompaction(t *testing.T) { catalog.EXPECT().ListImportJobs(mock.Anything).Return(nil, nil) catalog.EXPECT().ListPreImportTasks(mock.Anything).Return(nil, nil) catalog.EXPECT().ListImportTasks(mock.Anything).Return(nil, nil) - catalog.EXPECT().ListSegments(mock.Anything).Return(nil, nil) catalog.EXPECT().ListChannelCheckpoint(mock.Anything).Return(nil, nil) catalog.EXPECT().ListIndexes(mock.Anything).Return(nil, nil) catalog.EXPECT().ListSegmentIndexes(mock.Anything).Return(nil, nil) @@ -543,11 +543,9 @@ func TestImportCheckerCompaction(t *testing.T) { imeta, err := NewImportMeta(context.TODO(), catalog) assert.NoError(t, err) - meta, err := newMeta(context.TODO(), catalog, nil) - assert.NoError(t, err) - broker := broker2.NewMockBroker(t) - + broker.EXPECT().ShowCollectionIDs(mock.Anything).Return(&rootcoordpb.ShowCollectionIDsResponse{}, nil) + meta, err := newMeta(context.TODO(), catalog, nil, broker) sjm := NewMockStatsJobManager(t) l0CompactionTrigger := NewMockTriggerManager(t) l0CompactionTrigger.EXPECT().PauseL0SegmentCompacting(mock.Anything).Return().Maybe() diff --git a/internal/datacoord/import_util_test.go b/internal/datacoord/import_util_test.go index 5ae82e8f9f983..73ea2dc7f8bdd 100644 --- a/internal/datacoord/import_util_test.go +++ b/internal/datacoord/import_util_test.go @@ -33,6 +33,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/internal/datacoord/broker" + broker2 "github.com/milvus-io/milvus/internal/datacoord/broker" "github.com/milvus-io/milvus/internal/json" "github.com/milvus-io/milvus/internal/metastore/mocks" mocks2 "github.com/milvus-io/milvus/internal/mocks" @@ -41,6 +42,7 @@ import ( "github.com/milvus-io/milvus/pkg/proto/datapb" "github.com/milvus-io/milvus/pkg/proto/indexpb" "github.com/milvus-io/milvus/pkg/proto/internalpb" + "github.com/milvus-io/milvus/pkg/proto/rootcoordpb" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -165,17 +167,18 @@ func TestImportUtil_NewImportTasksWithDataTt(t *testing.T) { alloc.EXPECT().AllocID(mock.Anything).Return(rand.Int63(), nil) catalog := mocks.NewDataCoordCatalog(t) - catalog.EXPECT().ListSegments(mock.Anything).Return(nil, nil) + catalog.EXPECT().ListAnalyzeTasks(mock.Anything).Return(nil, nil) catalog.EXPECT().ListChannelCheckpoint(mock.Anything).Return(nil, nil) catalog.EXPECT().ListIndexes(mock.Anything).Return(nil, nil) catalog.EXPECT().ListSegmentIndexes(mock.Anything).Return(nil, nil) catalog.EXPECT().AddSegment(mock.Anything, mock.Anything).Return(nil) - catalog.EXPECT().ListAnalyzeTasks(mock.Anything).Return(nil, nil) catalog.EXPECT().ListCompactionTask(mock.Anything).Return(nil, nil) catalog.EXPECT().ListPartitionStatsInfos(mock.Anything).Return(nil, nil) catalog.EXPECT().ListStatsTasks(mock.Anything).Return(nil, nil) - meta, err := newMeta(context.TODO(), catalog, nil) + broker := broker2.NewMockBroker(t) + broker.EXPECT().ShowCollectionIDs(mock.Anything).Return(&rootcoordpb.ShowCollectionIDsResponse{}, nil) + meta, err := newMeta(context.TODO(), catalog, nil, broker) assert.NoError(t, err) tasks, err := NewImportTasks(fileGroups, job, alloc, meta) @@ -285,7 +288,6 @@ func TestImportUtil_AssembleRequestWithDataTt(t *testing.T) { } catalog := mocks.NewDataCoordCatalog(t) - catalog.EXPECT().ListSegments(mock.Anything).Return(nil, nil) catalog.EXPECT().ListChannelCheckpoint(mock.Anything).Return(nil, nil) catalog.EXPECT().ListIndexes(mock.Anything).Return(nil, nil) catalog.EXPECT().ListSegmentIndexes(mock.Anything).Return(nil, nil) @@ -301,7 +303,9 @@ func TestImportUtil_AssembleRequestWithDataTt(t *testing.T) { return id, id + n, nil }) - meta, err := newMeta(context.TODO(), catalog, nil) + broker := broker2.NewMockBroker(t) + broker.EXPECT().ShowCollectionIDs(mock.Anything).Return(&rootcoordpb.ShowCollectionIDsResponse{}, nil) + meta, err := newMeta(context.TODO(), catalog, nil, broker) assert.NoError(t, err) segment := &SegmentInfo{ SegmentInfo: &datapb.SegmentInfo{ID: 5, IsImporting: true}, diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 7123401befd02..61928a0b6a3ed 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -382,6 +382,10 @@ func (s *Server) initDataCoord() error { } log.Info("init service discovery done") + s.importMeta, err = NewImportMeta(s.ctx, s.meta.catalog) + if err != nil { + return err + } s.initCompaction() log.Info("init compaction done") @@ -391,13 +395,6 @@ func (s *Server) initDataCoord() error { s.initJobManager() log.Info("init statsJobManager done") - s.importMeta, err = NewImportMeta(s.ctx, s.meta.catalog) - if err != nil { - return err - } - s.initCompaction() - log.Info("init compaction done") - if err = s.initSegmentManager(); err != nil { return err } @@ -706,15 +703,10 @@ func (s *Server) initIndexNodeManager() { } func (s *Server) initCompaction() { -<<<<<<< HEAD cph := newCompactionPlanHandler(s.cluster, s.sessionManager, s.meta, s.allocator, s.handler) cph.loadMeta() s.compactionHandler = cph - s.compactionTriggerManager = NewCompactionTriggerManager(s.allocator, s.handler, s.compactionHandler, s.meta) -======= - s.compactionHandler = newCompactionPlanHandler(s.cluster, s.sessionManager, s.meta, s.allocator, s.taskScheduler, s.handler) s.compactionTriggerManager = NewCompactionTriggerManager(s.allocator, s.handler, s.compactionHandler, s.meta, s.importMeta) ->>>>>>> support to replicate import msg s.compactionTrigger = newCompactionTrigger(s.meta, s.compactionHandler, s.allocator, s.handler, s.indexEngineVersionManager) } diff --git a/internal/datacoord/services_test.go b/internal/datacoord/services_test.go index 51e9bd18caaf7..91a3e0d094ae4 100644 --- a/internal/datacoord/services_test.go +++ b/internal/datacoord/services_test.go @@ -22,9 +22,11 @@ import ( "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/internal/datacoord/broker" "github.com/milvus-io/milvus/internal/datacoord/session" + "github.com/milvus-io/milvus/internal/distributed/streaming" "github.com/milvus-io/milvus/internal/metastore/mocks" "github.com/milvus-io/milvus/internal/metastore/model" mocks2 "github.com/milvus-io/milvus/internal/mocks" + "github.com/milvus-io/milvus/internal/mocks/distributed/mock_streaming" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/mq/msgstream" @@ -1440,6 +1442,13 @@ func TestImportV2(t *testing.T) { catalog.EXPECT().ListPreImportTasks(mock.Anything).Return(nil, nil) catalog.EXPECT().ListImportTasks(mock.Anything).Return(nil, nil) catalog.EXPECT().SaveImportJob(mock.Anything, mock.Anything).Return(nil) + wal := mock_streaming.NewMockWALAccesser(t) + b := mock_streaming.NewMockBroadcast(t) + wal.EXPECT().Broadcast().Return(b) + b.EXPECT().BlockUntilResourceKeyAckOnce(mock.Anything, mock.Anything).Return(nil) + streaming.SetWALForTest(wal) + defer streaming.RecoverWALForTest() + s.importMeta, err = NewImportMeta(context.TODO(), catalog) assert.NoError(t, err) resp, err = s.GetImportProgress(ctx, &internalpb.GetImportProgressRequest{ diff --git a/internal/datanode/msghandlerimpl/msg_handler_impl_test.go b/internal/datanode/msghandlerimpl/msg_handler_impl_test.go index 59c9817270456..5b7367b4d47b3 100644 --- a/internal/datanode/msghandlerimpl/msg_handler_impl_test.go +++ b/internal/datanode/msghandlerimpl/msg_handler_impl_test.go @@ -19,15 +19,21 @@ package msghandlerimpl import ( + "context" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/milvus-io/milvus/internal/distributed/streaming" "github.com/milvus-io/milvus/internal/flushcommon/broker" + "github.com/milvus-io/milvus/internal/mocks/distributed/mock_streaming" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) func TestMsgHandlerImpl(t *testing.T) { + paramtable.Init() + ctx := context.Background() b := broker.NewMockBroker(t) m := NewMsgHandlerImpl(b) assert.Panics(t, func() { @@ -39,14 +45,17 @@ func TestMsgHandlerImpl(t *testing.T) { assert.Panics(t, func() { m.HandleManualFlush("", nil) }) - t.Run("HandleImport error", func(t *testing.T) { - b.EXPECT().ImportV2(mock.Anything, mock.Anything).Return(nil, assert.AnError).Once() - err := m.HandleImport(nil, "", nil) - assert.Error(t, err) - }) t.Run("HandleImport success", func(t *testing.T) { + wal := mock_streaming.NewMockWALAccesser(t) + bo := mock_streaming.NewMockBroadcast(t) + wal.EXPECT().Broadcast().Return(bo) + bo.EXPECT().Ack(mock.Anything, mock.Anything).Return(nil) + streaming.SetWALForTest(wal) + defer streaming.RecoverWALForTest() + + b.EXPECT().ImportV2(mock.Anything, mock.Anything).Return(nil, assert.AnError).Once() b.EXPECT().ImportV2(mock.Anything, mock.Anything).Return(nil, nil).Once() - err := m.HandleImport(nil, "", nil) + err := m.HandleImport(ctx, "", nil) assert.NoError(t, err) }) } diff --git a/internal/flushcommon/pipeline/flow_graph_dd_node.go b/internal/flushcommon/pipeline/flow_graph_dd_node.go index 3ff6824d51865..f9cd966868ae4 100644 --- a/internal/flushcommon/pipeline/flow_graph_dd_node.go +++ b/internal/flushcommon/pipeline/flow_graph_dd_node.go @@ -278,6 +278,9 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { } case commonpb.MsgType_Import: importMsg := msg.(*msgstream.ImportMsg) + if importMsg.GetCollectionID() != ddn.collectionID { + continue + } logger := log.With( zap.String("vchannel", ddn.Name()), zap.Int32("msgType", int32(msg.Type())), diff --git a/internal/proxy/task_import.go b/internal/proxy/task_import.go index f6b4470ab2f47..07d6683046460 100644 --- a/internal/proxy/task_import.go +++ b/internal/proxy/task_import.go @@ -216,7 +216,6 @@ func (it *importTask) getChannels() []pChan { } func (it *importTask) Execute(ctx context.Context) error { - // TODO fubang should send mq msg jobID, err := it.node.rowIDAllocator.AllocOne() if err != nil { log.Ctx(ctx).Warn("alloc job id failed", zap.Error(err)) @@ -249,7 +248,7 @@ func (it *importTask) Execute(ctx context.Context) error { log.Ctx(ctx).Warn("broadcast import msg failed", zap.Error(err)) return err } - log.Ctx(ctx).Debug( + log.Ctx(ctx).Info( "broadcast import msg success", zap.Int64("jobID", jobID), zap.Uint64("broadcastID", resp.BroadcastID), diff --git a/internal/querycoordv2/observers/replica_observer_test.go b/internal/querycoordv2/observers/replica_observer_test.go index a619cdd06ce82..8cfe153c361ea 100644 --- a/internal/querycoordv2/observers/replica_observer_test.go +++ b/internal/querycoordv2/observers/replica_observer_test.go @@ -204,7 +204,6 @@ func (suite *ReplicaObserverSuite) TestCheckNodesInReplica() { func (suite *ReplicaObserverSuite) TestCheckSQnodesInReplica() { balancer := mock_balancer.NewMockBalancer(suite.T()) - snmanager.StaticStreamingNodeManager.SetBalancerReady(balancer) change := make(chan struct{}) balancer.EXPECT().WatchChannelAssignments(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, cb func(typeutil.VersionInt64Pair, []types.PChannelInfoAssigned) error) error { @@ -249,6 +248,7 @@ func (suite *ReplicaObserverSuite) TestCheckSQnodesInReplica() { <-ctx.Done() return context.Cause(ctx) }) + snmanager.StaticStreamingNodeManager.SetBalancerReady(balancer) ctx := context.Background() err := suite.meta.CollectionManager.PutCollection(ctx, utils.CreateTestCollection(suite.collectionID, 2)) diff --git a/pkg/proto/data_coord.proto b/pkg/proto/data_coord.proto index f94d70e612743..8f18ef510bde9 100644 --- a/pkg/proto/data_coord.proto +++ b/pkg/proto/data_coord.proto @@ -639,7 +639,6 @@ message CompactionPlanResult { repeated CompactionSegment segments = 3; string channel = 4; CompactionType type = 5; - // l0 import file path } message CompactionStateResponse { diff --git a/pkg/proto/datapb/data_coord.pb.go b/pkg/proto/datapb/data_coord.pb.go index 5bfe843ec3238..ad33bba7b3807 100644 --- a/pkg/proto/datapb/data_coord.pb.go +++ b/pkg/proto/datapb/data_coord.pb.go @@ -4818,7 +4818,7 @@ type CompactionPlanResult struct { State CompactionTaskState `protobuf:"varint,2,opt,name=state,proto3,enum=milvus.proto.data.CompactionTaskState" json:"state,omitempty"` Segments []*CompactionSegment `protobuf:"bytes,3,rep,name=segments,proto3" json:"segments,omitempty"` Channel string `protobuf:"bytes,4,opt,name=channel,proto3" json:"channel,omitempty"` - Type CompactionType `protobuf:"varint,5,opt,name=type,proto3,enum=milvus.proto.data.CompactionType" json:"type,omitempty"` // l0 import file path + Type CompactionType `protobuf:"varint,5,opt,name=type,proto3,enum=milvus.proto.data.CompactionType" json:"type,omitempty"` } func (x *CompactionPlanResult) Reset() { diff --git a/scripts/run_intergration_test.sh b/scripts/run_intergration_test.sh index 999387e43c8d7..17cd174851058 100755 --- a/scripts/run_intergration_test.sh +++ b/scripts/run_intergration_test.sh @@ -39,9 +39,9 @@ for d in $(go list ./tests/integration/...); do if [[ $d == *"coordrecovery"* ]]; then echo "running coordrecovery" # simplified command to speed up coord init test since it is large. - $TEST_CMD -tags dynamic,test -v -coverprofile=profile.out -covermode=atomic "$d" -caseTimeout=20m -timeout=30m + $TEST_CMD -tags dynamic,test -v -coverprofile=profile.out -covermode=atomic "$d" -caseTimeout=50m -timeout=120m else - $TEST_CMD -race -tags dynamic,test -v -coverpkg=./... -coverprofile=profile.out -covermode=atomic "$d" -caseTimeout=15m -timeout=30m + $TEST_CMD -race -tags dynamic,test -v -coverpkg=./... -coverprofile=profile.out -covermode=atomic "$d" -caseTimeout=50m -timeout=120m fi if [ -f profile.out ]; then grep -v kafka profile.out | grep -v planparserv2/generated | grep -v mocks | sed '1d' >> ${FILE_COVERAGE_INFO} diff --git a/tests/integration/compaction/l0_compaction_test.go b/tests/integration/compaction/l0_compaction_test.go index 68ebb57289fa1..26f864b19325f 100644 --- a/tests/integration/compaction/l0_compaction_test.go +++ b/tests/integration/compaction/l0_compaction_test.go @@ -16,223 +16,224 @@ package compaction -// "context" -// "fmt" -// "time" - -// "github.com/samber/lo" -// "go.uber.org/zap" -// "google.golang.org/protobuf/proto" - -// "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" -// "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" -// "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" -// "github.com/milvus-io/milvus/pkg/common" -// "github.com/milvus-io/milvus/pkg/log" -// "github.com/milvus-io/milvus/pkg/proto/datapb" -// "github.com/milvus-io/milvus/pkg/util/funcutil" -// "github.com/milvus-io/milvus/pkg/util/merr" -// "github.com/milvus-io/milvus/pkg/util/metric" -// "github.com/milvus-io/milvus/pkg/util/paramtable" -// "github.com/milvus-io/milvus/tests/integration" +import ( + "context" + "fmt" + "time" + + "github.com/samber/lo" + "go.uber.org/zap" + "google.golang.org/protobuf/proto" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/proto/datapb" + "github.com/milvus-io/milvus/pkg/util/funcutil" + "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/metric" + "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/tests/integration" +) func (s *CompactionSuite) TestL0Compaction() { - // TODO fubang - // ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10) - // defer cancel() - // c := s.Cluster - - // const ( - // dim = 128 - // dbName = "" - // rowNum = 100000 - // deleteCnt = 50000 - - // indexType = integration.IndexFaissIvfFlat - // metricType = metric.L2 - // vecType = schemapb.DataType_FloatVector - // ) - - // paramtable.Get().Save(paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMinNum.Key, "1") - // defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMinNum.Key) - - // collectionName := "TestCompaction_" + funcutil.GenRandomStr() - - // schema := integration.ConstructSchemaOfVecDataType(collectionName, dim, false, vecType) - // marshaledSchema, err := proto.Marshal(schema) - // s.NoError(err) - - // // create collection - // createCollectionStatus, err := c.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{ - // DbName: dbName, - // CollectionName: collectionName, - // Schema: marshaledSchema, - // ShardsNum: common.DefaultShardsNum, - // ConsistencyLevel: commonpb.ConsistencyLevel_Strong, - // }) - // err = merr.CheckRPCCall(createCollectionStatus, err) - // s.NoError(err) - // log.Info("CreateCollection result", zap.Any("createCollectionStatus", createCollectionStatus)) - - // // show collection - // showCollectionsResp, err := c.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{}) - // err = merr.CheckRPCCall(showCollectionsResp, err) - // s.NoError(err) - // log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp)) - - // // insert - // pkColumn := integration.NewInt64FieldData(integration.Int64Field, rowNum) - // fVecColumn := integration.NewFloatVectorFieldData(integration.FloatVecField, rowNum, dim) - // hashKeys := integration.GenerateHashKeys(rowNum) - // insertResult, err := c.Proxy.Insert(ctx, &milvuspb.InsertRequest{ - // DbName: dbName, - // CollectionName: collectionName, - // FieldsData: []*schemapb.FieldData{pkColumn, fVecColumn}, - // HashKeys: hashKeys, - // NumRows: uint32(rowNum), - // }) - // err = merr.CheckRPCCall(insertResult, err) - // s.NoError(err) - // s.Equal(int64(rowNum), insertResult.GetInsertCnt()) - - // // flush - // flushResp, err := c.Proxy.Flush(ctx, &milvuspb.FlushRequest{ - // DbName: dbName, - // CollectionNames: []string{collectionName}, - // }) - // err = merr.CheckRPCCall(flushResp, err) - // s.NoError(err) - // segmentIDs, has := flushResp.GetCollSegIDs()[collectionName] - // ids := segmentIDs.GetData() - // s.Require().NotEmpty(segmentIDs) - // s.Require().True(has) - // flushTs, has := flushResp.GetCollFlushTs()[collectionName] - // s.True(has) - // s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName) - - // // create index - // createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{ - // CollectionName: collectionName, - // FieldName: integration.FloatVecField, - // IndexName: "_default", - // ExtraParams: integration.ConstructIndexParam(dim, indexType, metricType), - // }) - // err = merr.CheckRPCCall(createIndexStatus, err) - // s.NoError(err) - // s.WaitForIndexBuilt(ctx, collectionName, integration.FloatVecField) - - // segments, err := c.MetaWatcher.ShowSegments() - // s.NoError(err) - // s.NotEmpty(segments) - // // stats task happened - // s.Equal(2, len(segments)) - // s.Equal(int64(rowNum), segments[0].GetNumOfRows()) - - // // load - // loadStatus, err := c.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ - // DbName: dbName, - // CollectionName: collectionName, - // }) - // err = merr.CheckRPCCall(loadStatus, err) - // s.NoError(err) - // s.WaitForLoad(ctx, collectionName) - - // // delete - // deleteResult, err := c.Proxy.Delete(ctx, &milvuspb.DeleteRequest{ - // DbName: dbName, - // CollectionName: collectionName, - // Expr: fmt.Sprintf("%s < %d", integration.Int64Field, deleteCnt), - // }) - // err = merr.CheckRPCCall(deleteResult, err) - // s.NoError(err) - - // // flush l0 - // flushResp, err = c.Proxy.Flush(ctx, &milvuspb.FlushRequest{ - // DbName: dbName, - // CollectionNames: []string{collectionName}, - // }) - // err = merr.CheckRPCCall(flushResp, err) - // s.NoError(err) - // flushTs, has = flushResp.GetCollFlushTs()[collectionName] - // s.True(has) - // s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName) - - // // query - // queryResult, err := c.Proxy.Query(ctx, &milvuspb.QueryRequest{ - // DbName: dbName, - // CollectionName: collectionName, - // Expr: "", - // OutputFields: []string{"count(*)"}, - // }) - // err = merr.CheckRPCCall(queryResult, err) - // s.NoError(err) - // s.Equal(int64(rowNum-deleteCnt), queryResult.GetFieldsData()[0].GetScalars().GetLongData().GetData()[0]) - - // // wait for l0 compaction completed - // showSegments := func() bool { - // segments, err = c.MetaWatcher.ShowSegments() - // s.NoError(err) - // s.NotEmpty(segments) - // log.Info("ShowSegments result", zap.Any("segments", segments)) - // flushed := lo.Filter(segments, func(segment *datapb.SegmentInfo, _ int) bool { - // return segment.GetState() == commonpb.SegmentState_Flushed - // }) - // if len(flushed) == 1 && - // flushed[0].GetLevel() == datapb.SegmentLevel_L1 && - // flushed[0].GetNumOfRows() == rowNum { - // log.Info("l0 compaction done, wait for single compaction") - // } - // return len(flushed) == 1 && - // flushed[0].GetLevel() == datapb.SegmentLevel_L1 && - // flushed[0].GetNumOfRows() == rowNum-deleteCnt - // } - // for !showSegments() { - // select { - // case <-ctx.Done(): - // s.Fail("waiting for compaction timeout") - // return - // case <-time.After(1 * time.Second): - // } - // } - - // // search - // expr := fmt.Sprintf("%s > 0", integration.Int64Field) - // nq := 10 - // topk := 10 - // roundDecimal := -1 - // params := integration.GetSearchParams(indexType, metricType) - // searchReq := integration.ConstructSearchRequest("", collectionName, expr, - // integration.FloatVecField, vecType, nil, metricType, params, nq, dim, topk, roundDecimal) - - // searchResult, err := c.Proxy.Search(ctx, searchReq) - // err = merr.CheckRPCCall(searchResult, err) - // s.NoError(err) - // s.Equal(nq*topk, len(searchResult.GetResults().GetScores())) - - // // query - // queryResult, err = c.Proxy.Query(ctx, &milvuspb.QueryRequest{ - // DbName: dbName, - // CollectionName: collectionName, - // Expr: "", - // OutputFields: []string{"count(*)"}, - // }) - // err = merr.CheckRPCCall(queryResult, err) - // s.NoError(err) - // s.Equal(int64(rowNum-deleteCnt), queryResult.GetFieldsData()[0].GetScalars().GetLongData().GetData()[0]) - - // // release collection - // status, err := c.Proxy.ReleaseCollection(ctx, &milvuspb.ReleaseCollectionRequest{ - // CollectionName: collectionName, - // }) - // err = merr.CheckRPCCall(status, err) - // s.NoError(err) + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10) + defer cancel() + c := s.Cluster + + const ( + dim = 128 + dbName = "" + rowNum = 100000 + deleteCnt = 50000 + + indexType = integration.IndexFaissIvfFlat + metricType = metric.L2 + vecType = schemapb.DataType_FloatVector + ) + + paramtable.Get().Save(paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMinNum.Key, "1") + defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMinNum.Key) + + collectionName := "TestCompaction_" + funcutil.GenRandomStr() + + schema := integration.ConstructSchemaOfVecDataType(collectionName, dim, false, vecType) + marshaledSchema, err := proto.Marshal(schema) + s.NoError(err) + + // create collection + createCollectionStatus, err := c.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{ + DbName: dbName, + CollectionName: collectionName, + Schema: marshaledSchema, + ShardsNum: common.DefaultShardsNum, + ConsistencyLevel: commonpb.ConsistencyLevel_Strong, + }) + err = merr.CheckRPCCall(createCollectionStatus, err) + s.NoError(err) + log.Info("CreateCollection result", zap.Any("createCollectionStatus", createCollectionStatus)) + + // show collection + showCollectionsResp, err := c.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{}) + err = merr.CheckRPCCall(showCollectionsResp, err) + s.NoError(err) + log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp)) + + // insert + pkColumn := integration.NewInt64FieldData(integration.Int64Field, rowNum) + fVecColumn := integration.NewFloatVectorFieldData(integration.FloatVecField, rowNum, dim) + hashKeys := integration.GenerateHashKeys(rowNum) + insertResult, err := c.Proxy.Insert(ctx, &milvuspb.InsertRequest{ + DbName: dbName, + CollectionName: collectionName, + FieldsData: []*schemapb.FieldData{pkColumn, fVecColumn}, + HashKeys: hashKeys, + NumRows: uint32(rowNum), + }) + err = merr.CheckRPCCall(insertResult, err) + s.NoError(err) + s.Equal(int64(rowNum), insertResult.GetInsertCnt()) + + // flush + flushResp, err := c.Proxy.Flush(ctx, &milvuspb.FlushRequest{ + DbName: dbName, + CollectionNames: []string{collectionName}, + }) + err = merr.CheckRPCCall(flushResp, err) + s.NoError(err) + segmentIDs, has := flushResp.GetCollSegIDs()[collectionName] + ids := segmentIDs.GetData() + s.Require().NotEmpty(segmentIDs) + s.Require().True(has) + flushTs, has := flushResp.GetCollFlushTs()[collectionName] + s.True(has) + s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName) + + // create index + createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{ + CollectionName: collectionName, + FieldName: integration.FloatVecField, + IndexName: "_default", + ExtraParams: integration.ConstructIndexParam(dim, indexType, metricType), + }) + err = merr.CheckRPCCall(createIndexStatus, err) + s.NoError(err) + s.WaitForIndexBuilt(ctx, collectionName, integration.FloatVecField) + + segments, err := c.MetaWatcher.ShowSegments() + s.NoError(err) + s.NotEmpty(segments) + // stats task happened + s.Equal(2, len(segments)) + s.Equal(int64(rowNum), segments[0].GetNumOfRows()) + + // load + loadStatus, err := c.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ + DbName: dbName, + CollectionName: collectionName, + }) + err = merr.CheckRPCCall(loadStatus, err) + s.NoError(err) + s.WaitForLoad(ctx, collectionName) + + // delete + deleteResult, err := c.Proxy.Delete(ctx, &milvuspb.DeleteRequest{ + DbName: dbName, + CollectionName: collectionName, + Expr: fmt.Sprintf("%s < %d", integration.Int64Field, deleteCnt), + }) + err = merr.CheckRPCCall(deleteResult, err) + s.NoError(err) + + // flush l0 + flushResp, err = c.Proxy.Flush(ctx, &milvuspb.FlushRequest{ + DbName: dbName, + CollectionNames: []string{collectionName}, + }) + err = merr.CheckRPCCall(flushResp, err) + s.NoError(err) + flushTs, has = flushResp.GetCollFlushTs()[collectionName] + s.True(has) + s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName) + + // query + queryResult, err := c.Proxy.Query(ctx, &milvuspb.QueryRequest{ + DbName: dbName, + CollectionName: collectionName, + Expr: "", + OutputFields: []string{"count(*)"}, + }) + err = merr.CheckRPCCall(queryResult, err) + s.NoError(err) + s.Equal(int64(rowNum-deleteCnt), queryResult.GetFieldsData()[0].GetScalars().GetLongData().GetData()[0]) + + // wait for l0 compaction completed + showSegments := func() bool { + segments, err = c.MetaWatcher.ShowSegments() + s.NoError(err) + s.NotEmpty(segments) + log.Info("ShowSegments result", zap.Any("segments", segments)) + flushed := lo.Filter(segments, func(segment *datapb.SegmentInfo, _ int) bool { + return segment.GetState() == commonpb.SegmentState_Flushed + }) + if len(flushed) == 1 && + flushed[0].GetLevel() == datapb.SegmentLevel_L1 && + flushed[0].GetNumOfRows() == rowNum { + log.Info("l0 compaction done, wait for single compaction") + } + return len(flushed) == 1 && + flushed[0].GetLevel() == datapb.SegmentLevel_L1 && + flushed[0].GetNumOfRows() == rowNum-deleteCnt + } + for !showSegments() { + select { + case <-ctx.Done(): + s.Fail("waiting for compaction timeout") + return + case <-time.After(1 * time.Second): + } + } + + // search + expr := fmt.Sprintf("%s > 0", integration.Int64Field) + nq := 10 + topk := 10 + roundDecimal := -1 + params := integration.GetSearchParams(indexType, metricType) + searchReq := integration.ConstructSearchRequest("", collectionName, expr, + integration.FloatVecField, vecType, nil, metricType, params, nq, dim, topk, roundDecimal) + + searchResult, err := c.Proxy.Search(ctx, searchReq) + err = merr.CheckRPCCall(searchResult, err) + s.NoError(err) + s.Equal(nq*topk, len(searchResult.GetResults().GetScores())) + + // query + queryResult, err = c.Proxy.Query(ctx, &milvuspb.QueryRequest{ + DbName: dbName, + CollectionName: collectionName, + Expr: "", + OutputFields: []string{"count(*)"}, + }) + err = merr.CheckRPCCall(queryResult, err) + s.NoError(err) + s.Equal(int64(rowNum-deleteCnt), queryResult.GetFieldsData()[0].GetScalars().GetLongData().GetData()[0]) + + // release collection + status, err := c.Proxy.ReleaseCollection(ctx, &milvuspb.ReleaseCollectionRequest{ + CollectionName: collectionName, + }) + err = merr.CheckRPCCall(status, err) + s.NoError(err) // drop collection // status, err = c.Proxy.DropCollection(ctx, &milvuspb.DropCollectionRequest{ - // CollectionName: collectionName, + // CollectionName: collectionName, // }) // err = merr.CheckRPCCall(status, err) // s.NoError(err) - // log.Info("Test compaction succeed") + log.Info("Test compaction succeed") } diff --git a/tests/integration/import/dynamic_field_test.go b/tests/integration/import/dynamic_field_test.go index 14b1ab1c7d89f..f0d89dca486b0 100644 --- a/tests/integration/import/dynamic_field_test.go +++ b/tests/integration/import/dynamic_field_test.go @@ -47,7 +47,7 @@ func (s *BulkInsertSuite) testImportDynamicField() { ) c := s.Cluster - ctx, cancel := context.WithTimeout(c.GetContext(), 120*time.Second) + ctx, cancel := context.WithTimeout(c.GetContext(), 240*time.Second) defer cancel() collectionName := "TestBulkInsert_B_" + funcutil.GenRandomStr() diff --git a/tests/integration/import/import_test.go b/tests/integration/import/import_test.go index a979cd1186306..8891428949023 100644 --- a/tests/integration/import/import_test.go +++ b/tests/integration/import/import_test.go @@ -76,7 +76,7 @@ func (s *BulkInsertSuite) run() { ) c := s.Cluster - ctx, cancel := context.WithTimeout(c.GetContext(), 120*time.Second) + ctx, cancel := context.WithTimeout(c.GetContext(), 240*time.Second) defer cancel() collectionName := "TestBulkInsert" + funcutil.GenRandomStr() diff --git a/tests/integration/import/partition_key_test.go b/tests/integration/import/partition_key_test.go index 8ff58df999160..73fc98e5ab063 100644 --- a/tests/integration/import/partition_key_test.go +++ b/tests/integration/import/partition_key_test.go @@ -46,7 +46,7 @@ func (s *BulkInsertSuite) TestImportWithPartitionKey() { ) c := s.Cluster - ctx, cancel := context.WithTimeout(c.GetContext(), 120*time.Second) + ctx, cancel := context.WithTimeout(c.GetContext(), 240*time.Second) defer cancel() collectionName := "TestBulkInsert_WithPartitionKey_" + funcutil.GenRandomStr()