diff --git a/internal/querycoordv2/meta/target_manager.go b/internal/querycoordv2/meta/target_manager.go index fd2ef164892a1..a4217cbb93fa3 100644 --- a/internal/querycoordv2/meta/target_manager.go +++ b/internal/querycoordv2/meta/target_manager.go @@ -159,7 +159,6 @@ func (mgr *TargetManager) UpdateCollectionNextTarget(collectionID int64) error { partitionIDs := lo.Map(partitions, func(partition *Partition, i int) int64 { return partition.PartitionID }) - allocatedTarget := NewCollectionTarget(nil, nil, partitionIDs) channelInfos := make(map[string][]*datapb.VchannelInfo) segments := make(map[int64]*datapb.SegmentInfo, 0) @@ -194,7 +193,8 @@ func (mgr *TargetManager) UpdateCollectionNextTarget(collectionID int64) error { return nil } - mgr.next.updateCollectionTarget(collectionID, NewCollectionTarget(segments, dmChannels, partitionIDs)) + allocatedTarget := NewCollectionTarget(segments, dmChannels, partitionIDs) + mgr.next.updateCollectionTarget(collectionID, allocatedTarget) log.Debug("finish to update next targets for collection", zap.Int64("collectionID", collectionID), zap.Int64s("PartitionIDs", partitionIDs), diff --git a/internal/querycoordv2/observers/target_observer.go b/internal/querycoordv2/observers/target_observer.go index 83bddf48360bf..d586341548c9c 100644 --- a/internal/querycoordv2/observers/target_observer.go +++ b/internal/querycoordv2/observers/target_observer.go @@ -87,8 +87,12 @@ type TargetObserver struct { mut sync.Mutex // Guard readyNotifiers readyNotifiers map[int64][]chan struct{} // CollectionID -> Notifiers - dispatcher *taskDispatcher[int64] - keylocks *lock.KeyLock[int64] + // loadingDispatcher updates targets for collections that are loading (also collections without a current target). + loadingDispatcher *taskDispatcher[int64] + // loadedDispatcher updates targets for loaded collections. + loadedDispatcher *taskDispatcher[int64] + + keylocks *lock.KeyLock[int64] startOnce sync.Once stopOnce sync.Once @@ -114,8 +118,8 @@ func NewTargetObserver( keylocks: lock.NewKeyLock[int64](), } - dispatcher := newTaskDispatcher(result.check) - result.dispatcher = dispatcher + result.loadingDispatcher = newTaskDispatcher(result.check) + result.loadedDispatcher = newTaskDispatcher(result.check) return result } @@ -124,7 +128,8 @@ func (ob *TargetObserver) Start() { ctx, cancel := context.WithCancel(context.Background()) ob.cancel = cancel - ob.dispatcher.Start() + ob.loadingDispatcher.Start() + ob.loadedDispatcher.Start() ob.wg.Add(1) go func() { @@ -144,7 +149,8 @@ func (ob *TargetObserver) Stop() { } ob.wg.Wait() - ob.dispatcher.Stop() + ob.loadingDispatcher.Stop() + ob.loadedDispatcher.Stop() }) } @@ -167,7 +173,13 @@ func (ob *TargetObserver) schedule(ctx context.Context) { case <-ticker.C: ob.clean() - ob.dispatcher.AddTask(ob.meta.GetAll()...) + loaded := lo.FilterMap(ob.meta.GetAllCollections(), func(collection *meta.Collection, _ int) (int64, bool) { + if collection.GetStatus() == querypb.LoadStatus_Loaded { + return collection.GetCollectionID(), true + } + return 0, false + }) + ob.loadedDispatcher.AddTask(loaded...) case req := <-ob.updateChan: log.Info("manually trigger update target", @@ -217,7 +229,7 @@ func (ob *TargetObserver) schedule(ctx context.Context) { func (ob *TargetObserver) Check(ctx context.Context, collectionID int64, partitionID int64) bool { result := ob.targetMgr.IsCurrentTargetExist(collectionID, partitionID) if !result { - ob.dispatcher.AddTask(collectionID) + ob.loadingDispatcher.AddTask(collectionID) } return result } diff --git a/internal/querycoordv2/observers/target_observer_test.go b/internal/querycoordv2/observers/target_observer_test.go index 45d804b0f897e..026626ef17d7e 100644 --- a/internal/querycoordv2/observers/target_observer_test.go +++ b/internal/querycoordv2/observers/target_observer_test.go @@ -90,7 +90,9 @@ func (suite *TargetObserverSuite) SetupTest() { suite.collectionID = int64(1000) suite.partitionID = int64(100) - err = suite.meta.CollectionManager.PutCollection(utils.CreateTestCollection(suite.collectionID, 1)) + testCollection := utils.CreateTestCollection(suite.collectionID, 1) + testCollection.Status = querypb.LoadStatus_Loaded + err = suite.meta.CollectionManager.PutCollection(testCollection) suite.NoError(err) err = suite.meta.CollectionManager.PutPartition(utils.CreateTestPartition(suite.collectionID, suite.partitionID)) suite.NoError(err) @@ -302,7 +304,8 @@ func (suite *TargetObserverCheckSuite) SetupTest() { func (s *TargetObserverCheckSuite) TestCheck() { r := s.observer.Check(context.Background(), s.collectionID, common.AllPartitionsID) s.False(r) - s.True(s.observer.dispatcher.tasks.Contain(s.collectionID)) + s.False(s.observer.loadedDispatcher.tasks.Contain(s.collectionID)) + s.True(s.observer.loadingDispatcher.tasks.Contain(s.collectionID)) } func TestTargetObserver(t *testing.T) {