Skip to content

Commit

Permalink
fix TestImportV2 unit case
Browse files Browse the repository at this point in the history
Signed-off-by: SimFG <[email protected]>
  • Loading branch information
SimFG committed Feb 7, 2025
1 parent f185ac5 commit 49a4625
Show file tree
Hide file tree
Showing 18 changed files with 274 additions and 251 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ jobs:
name: Integration Test
needs: Build
runs-on: ubuntu-latest
timeout-minutes: 120
timeout-minutes: 150
steps:
- name: Maximize build space
uses: easimon/maximize-build-space@master
Expand Down
4 changes: 4 additions & 0 deletions internal/datacoord/compaction_trigger_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ func (m *CompactionTriggerManager) setL0Triggering(b bool) {
m.l0SigLock.Lock()
defer m.l0SigLock.Unlock()
m.l0Triggering = b
if !b {
m.l0TickSig.Broadcast()
}
}

func (m *CompactionTriggerManager) startLoop() {
Expand Down Expand Up @@ -179,6 +182,7 @@ func (m *CompactionTriggerManager) startLoop() {
events, err := m.l0Policy.Trigger()
if err != nil {
log.Warn("Fail to trigger L0 policy", zap.Error(err))
m.setL0Triggering(false)
continue
}
ctx := context.Background()
Expand Down
9 changes: 7 additions & 2 deletions internal/datacoord/import_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (c *importChecker) Start() {
log.Info("waiting for all channels to send signals",
zap.Strings("vchannels", job.GetVchannels()),
zap.Strings("readyVchannels", job.GetReadyVchannels()),
zap.Int64("jobID", job.GetJobID())) // TODO fubang
zap.Int64("jobID", job.GetJobID()))
continue
}
switch job.GetState() {
Expand Down Expand Up @@ -215,7 +215,6 @@ func (c *importChecker) checkPendingJob(job ImportJob) {
log := log.With(zap.Int64("jobID", job.GetJobID()))
lacks := c.getLackFilesForPreImports(job)
if len(lacks) == 0 {
log.Info("import job start to preimport") // TODO fubang
return
}
fileGroups := lo.Chunk(lacks, Params.DataCoordCfg.FilesPerPreImportTask.GetAsInt())
Expand Down Expand Up @@ -395,11 +394,17 @@ func (c *importChecker) checkIndexBuildingJob(job ImportJob) {
}

// wait l0 segment import and block l0 compaction
log.Info("start to pause l0 segment compacting", zap.Int64("jobID", job.GetJobID()))
c.l0CompactionTrigger.PauseL0SegmentCompacting(job.GetCollectionID())
log.Info("l0 segment compacting paused", zap.Int64("jobID", job.GetJobID()))
defer c.l0CompactionTrigger.ResumeL0SegmentCompacting(job.GetCollectionID())
l0ImportTasks := c.imeta.GetTaskBy(context.TODO(), WithType(ImportTaskType), WithJob(job.GetJobID()), WithL0CompactionSource())
for _, t := range l0ImportTasks {
if t.GetState() != datapb.ImportTaskStateV2_Completed {
log.Info("waiting for l0 import task...",
zap.Int64s("taskIDs", lo.Map(l0ImportTasks, func(t ImportTask, _ int) int64 {
return t.GetTaskID()
})))
return
}
}
Expand Down
8 changes: 3 additions & 5 deletions internal/datacoord/import_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/milvus-io/milvus/pkg/proto/datapb"
"github.com/milvus-io/milvus/pkg/proto/indexpb"
"github.com/milvus-io/milvus/pkg/proto/internalpb"
"github.com/milvus-io/milvus/pkg/proto/rootcoordpb"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/timerecord"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
Expand Down Expand Up @@ -528,7 +529,6 @@ func TestImportCheckerCompaction(t *testing.T) {
catalog.EXPECT().ListImportJobs(mock.Anything).Return(nil, nil)
catalog.EXPECT().ListPreImportTasks(mock.Anything).Return(nil, nil)
catalog.EXPECT().ListImportTasks(mock.Anything).Return(nil, nil)
catalog.EXPECT().ListSegments(mock.Anything).Return(nil, nil)
catalog.EXPECT().ListChannelCheckpoint(mock.Anything).Return(nil, nil)
catalog.EXPECT().ListIndexes(mock.Anything).Return(nil, nil)
catalog.EXPECT().ListSegmentIndexes(mock.Anything).Return(nil, nil)
Expand All @@ -543,11 +543,9 @@ func TestImportCheckerCompaction(t *testing.T) {
imeta, err := NewImportMeta(context.TODO(), catalog)
assert.NoError(t, err)

meta, err := newMeta(context.TODO(), catalog, nil)
assert.NoError(t, err)

broker := broker2.NewMockBroker(t)

broker.EXPECT().ShowCollectionIDs(mock.Anything).Return(&rootcoordpb.ShowCollectionIDsResponse{}, nil)
meta, err := newMeta(context.TODO(), catalog, nil, broker)
sjm := NewMockStatsJobManager(t)
l0CompactionTrigger := NewMockTriggerManager(t)
l0CompactionTrigger.EXPECT().PauseL0SegmentCompacting(mock.Anything).Return().Maybe()
Expand Down
14 changes: 9 additions & 5 deletions internal/datacoord/import_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/datacoord/allocator"
"github.com/milvus-io/milvus/internal/datacoord/broker"
broker2 "github.com/milvus-io/milvus/internal/datacoord/broker"
"github.com/milvus-io/milvus/internal/json"
"github.com/milvus-io/milvus/internal/metastore/mocks"
mocks2 "github.com/milvus-io/milvus/internal/mocks"
Expand All @@ -41,6 +42,7 @@ import (
"github.com/milvus-io/milvus/pkg/proto/datapb"
"github.com/milvus-io/milvus/pkg/proto/indexpb"
"github.com/milvus-io/milvus/pkg/proto/internalpb"
"github.com/milvus-io/milvus/pkg/proto/rootcoordpb"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
Expand Down Expand Up @@ -165,17 +167,18 @@ func TestImportUtil_NewImportTasksWithDataTt(t *testing.T) {
alloc.EXPECT().AllocID(mock.Anything).Return(rand.Int63(), nil)

catalog := mocks.NewDataCoordCatalog(t)
catalog.EXPECT().ListSegments(mock.Anything).Return(nil, nil)
catalog.EXPECT().ListAnalyzeTasks(mock.Anything).Return(nil, nil)
catalog.EXPECT().ListChannelCheckpoint(mock.Anything).Return(nil, nil)
catalog.EXPECT().ListIndexes(mock.Anything).Return(nil, nil)
catalog.EXPECT().ListSegmentIndexes(mock.Anything).Return(nil, nil)
catalog.EXPECT().AddSegment(mock.Anything, mock.Anything).Return(nil)
catalog.EXPECT().ListAnalyzeTasks(mock.Anything).Return(nil, nil)
catalog.EXPECT().ListCompactionTask(mock.Anything).Return(nil, nil)
catalog.EXPECT().ListPartitionStatsInfos(mock.Anything).Return(nil, nil)
catalog.EXPECT().ListStatsTasks(mock.Anything).Return(nil, nil)

meta, err := newMeta(context.TODO(), catalog, nil)
broker := broker2.NewMockBroker(t)
broker.EXPECT().ShowCollectionIDs(mock.Anything).Return(&rootcoordpb.ShowCollectionIDsResponse{}, nil)
meta, err := newMeta(context.TODO(), catalog, nil, broker)
assert.NoError(t, err)

tasks, err := NewImportTasks(fileGroups, job, alloc, meta)
Expand Down Expand Up @@ -285,7 +288,6 @@ func TestImportUtil_AssembleRequestWithDataTt(t *testing.T) {
}

catalog := mocks.NewDataCoordCatalog(t)
catalog.EXPECT().ListSegments(mock.Anything).Return(nil, nil)
catalog.EXPECT().ListChannelCheckpoint(mock.Anything).Return(nil, nil)
catalog.EXPECT().ListIndexes(mock.Anything).Return(nil, nil)
catalog.EXPECT().ListSegmentIndexes(mock.Anything).Return(nil, nil)
Expand All @@ -301,7 +303,9 @@ func TestImportUtil_AssembleRequestWithDataTt(t *testing.T) {
return id, id + n, nil
})

meta, err := newMeta(context.TODO(), catalog, nil)
broker := broker2.NewMockBroker(t)
broker.EXPECT().ShowCollectionIDs(mock.Anything).Return(&rootcoordpb.ShowCollectionIDsResponse{}, nil)
meta, err := newMeta(context.TODO(), catalog, nil, broker)
assert.NoError(t, err)
segment := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{ID: 5, IsImporting: true},
Expand Down
16 changes: 4 additions & 12 deletions internal/datacoord/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,10 @@ func (s *Server) initDataCoord() error {
}
log.Info("init service discovery done")

s.importMeta, err = NewImportMeta(s.ctx, s.meta.catalog)
if err != nil {
return err
}
s.initCompaction()
log.Info("init compaction done")

Expand All @@ -391,13 +395,6 @@ func (s *Server) initDataCoord() error {
s.initJobManager()
log.Info("init statsJobManager done")

s.importMeta, err = NewImportMeta(s.ctx, s.meta.catalog)
if err != nil {
return err
}
s.initCompaction()
log.Info("init compaction done")

if err = s.initSegmentManager(); err != nil {
return err
}
Expand Down Expand Up @@ -706,15 +703,10 @@ func (s *Server) initIndexNodeManager() {
}

func (s *Server) initCompaction() {
<<<<<<< HEAD
cph := newCompactionPlanHandler(s.cluster, s.sessionManager, s.meta, s.allocator, s.handler)
cph.loadMeta()
s.compactionHandler = cph
s.compactionTriggerManager = NewCompactionTriggerManager(s.allocator, s.handler, s.compactionHandler, s.meta)
=======
s.compactionHandler = newCompactionPlanHandler(s.cluster, s.sessionManager, s.meta, s.allocator, s.taskScheduler, s.handler)
s.compactionTriggerManager = NewCompactionTriggerManager(s.allocator, s.handler, s.compactionHandler, s.meta, s.importMeta)
>>>>>>> support to replicate import msg
s.compactionTrigger = newCompactionTrigger(s.meta, s.compactionHandler, s.allocator, s.handler, s.indexEngineVersionManager)
}

Expand Down
9 changes: 9 additions & 0 deletions internal/datacoord/services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ import (
"github.com/milvus-io/milvus/internal/datacoord/allocator"
"github.com/milvus-io/milvus/internal/datacoord/broker"
"github.com/milvus-io/milvus/internal/datacoord/session"
"github.com/milvus-io/milvus/internal/distributed/streaming"
"github.com/milvus-io/milvus/internal/metastore/mocks"
"github.com/milvus-io/milvus/internal/metastore/model"
mocks2 "github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/mocks/distributed/mock_streaming"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
Expand Down Expand Up @@ -1440,6 +1442,13 @@ func TestImportV2(t *testing.T) {
catalog.EXPECT().ListPreImportTasks(mock.Anything).Return(nil, nil)
catalog.EXPECT().ListImportTasks(mock.Anything).Return(nil, nil)
catalog.EXPECT().SaveImportJob(mock.Anything, mock.Anything).Return(nil)
wal := mock_streaming.NewMockWALAccesser(t)
b := mock_streaming.NewMockBroadcast(t)
wal.EXPECT().Broadcast().Return(b)
b.EXPECT().BlockUntilResourceKeyAckOnce(mock.Anything, mock.Anything).Return(nil)
streaming.SetWALForTest(wal)
defer streaming.RecoverWALForTest()

s.importMeta, err = NewImportMeta(context.TODO(), catalog)
assert.NoError(t, err)
resp, err = s.GetImportProgress(ctx, &internalpb.GetImportProgressRequest{
Expand Down
21 changes: 15 additions & 6 deletions internal/datanode/msghandlerimpl/msg_handler_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,21 @@
package msghandlerimpl

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

"github.com/milvus-io/milvus/internal/distributed/streaming"
"github.com/milvus-io/milvus/internal/flushcommon/broker"
"github.com/milvus-io/milvus/internal/mocks/distributed/mock_streaming"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)

func TestMsgHandlerImpl(t *testing.T) {
paramtable.Init()
ctx := context.Background()
b := broker.NewMockBroker(t)
m := NewMsgHandlerImpl(b)
assert.Panics(t, func() {
Expand All @@ -39,14 +45,17 @@ func TestMsgHandlerImpl(t *testing.T) {
assert.Panics(t, func() {
m.HandleManualFlush("", nil)
})
t.Run("HandleImport error", func(t *testing.T) {
b.EXPECT().ImportV2(mock.Anything, mock.Anything).Return(nil, assert.AnError).Once()
err := m.HandleImport(nil, "", nil)
assert.Error(t, err)
})
t.Run("HandleImport success", func(t *testing.T) {
wal := mock_streaming.NewMockWALAccesser(t)
bo := mock_streaming.NewMockBroadcast(t)
wal.EXPECT().Broadcast().Return(bo)
bo.EXPECT().Ack(mock.Anything, mock.Anything).Return(nil)
streaming.SetWALForTest(wal)
defer streaming.RecoverWALForTest()

b.EXPECT().ImportV2(mock.Anything, mock.Anything).Return(nil, assert.AnError).Once()
b.EXPECT().ImportV2(mock.Anything, mock.Anything).Return(nil, nil).Once()
err := m.HandleImport(nil, "", nil)
err := m.HandleImport(ctx, "", nil)
assert.NoError(t, err)
})
}
3 changes: 3 additions & 0 deletions internal/flushcommon/pipeline/flow_graph_dd_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,9 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
}
case commonpb.MsgType_Import:
importMsg := msg.(*msgstream.ImportMsg)
if importMsg.GetCollectionID() != ddn.collectionID {
continue
}
logger := log.With(
zap.String("vchannel", ddn.Name()),
zap.Int32("msgType", int32(msg.Type())),
Expand Down
3 changes: 1 addition & 2 deletions internal/proxy/task_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,6 @@ func (it *importTask) getChannels() []pChan {
}

func (it *importTask) Execute(ctx context.Context) error {
// TODO fubang should send mq msg
jobID, err := it.node.rowIDAllocator.AllocOne()
if err != nil {
log.Ctx(ctx).Warn("alloc job id failed", zap.Error(err))
Expand Down Expand Up @@ -249,7 +248,7 @@ func (it *importTask) Execute(ctx context.Context) error {
log.Ctx(ctx).Warn("broadcast import msg failed", zap.Error(err))
return err
}
log.Ctx(ctx).Debug(
log.Ctx(ctx).Info(
"broadcast import msg success",
zap.Int64("jobID", jobID),
zap.Uint64("broadcastID", resp.BroadcastID),
Expand Down
2 changes: 1 addition & 1 deletion internal/querycoordv2/observers/replica_observer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,6 @@ func (suite *ReplicaObserverSuite) TestCheckNodesInReplica() {

func (suite *ReplicaObserverSuite) TestCheckSQnodesInReplica() {
balancer := mock_balancer.NewMockBalancer(suite.T())
snmanager.StaticStreamingNodeManager.SetBalancerReady(balancer)

change := make(chan struct{})
balancer.EXPECT().WatchChannelAssignments(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, cb func(typeutil.VersionInt64Pair, []types.PChannelInfoAssigned) error) error {
Expand Down Expand Up @@ -249,6 +248,7 @@ func (suite *ReplicaObserverSuite) TestCheckSQnodesInReplica() {
<-ctx.Done()
return context.Cause(ctx)
})
snmanager.StaticStreamingNodeManager.SetBalancerReady(balancer)

ctx := context.Background()
err := suite.meta.CollectionManager.PutCollection(ctx, utils.CreateTestCollection(suite.collectionID, 2))
Expand Down
1 change: 0 additions & 1 deletion pkg/proto/data_coord.proto
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,6 @@ message CompactionPlanResult {
repeated CompactionSegment segments = 3;
string channel = 4;
CompactionType type = 5;
// l0 import file path
}

message CompactionStateResponse {
Expand Down
2 changes: 1 addition & 1 deletion pkg/proto/datapb/data_coord.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions scripts/run_intergration_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ for d in $(go list ./tests/integration/...); do
if [[ $d == *"coordrecovery"* ]]; then
echo "running coordrecovery"
# simplified command to speed up coord init test since it is large.
$TEST_CMD -tags dynamic,test -v -coverprofile=profile.out -covermode=atomic "$d" -caseTimeout=20m -timeout=30m
$TEST_CMD -tags dynamic,test -v -coverprofile=profile.out -covermode=atomic "$d" -caseTimeout=50m -timeout=120m
else
$TEST_CMD -race -tags dynamic,test -v -coverpkg=./... -coverprofile=profile.out -covermode=atomic "$d" -caseTimeout=15m -timeout=30m
$TEST_CMD -race -tags dynamic,test -v -coverpkg=./... -coverprofile=profile.out -covermode=atomic "$d" -caseTimeout=50m -timeout=120m
fi
if [ -f profile.out ]; then
grep -v kafka profile.out | grep -v planparserv2/generated | grep -v mocks | sed '1d' >> ${FILE_COVERAGE_INFO}
Expand Down
Loading

0 comments on commit 49a4625

Please sign in to comment.