Skip to content

Commit

Permalink
Refactor task scheduler and executor (milvus-io#20828)
Browse files Browse the repository at this point in the history
Make the performance able to scale out

Signed-off-by: yah01 <[email protected]>

Signed-off-by: yah01 <[email protected]>
  • Loading branch information
yah01 authored Nov 30, 2022
1 parent 18762f8 commit 060649b
Show file tree
Hide file tree
Showing 9 changed files with 283 additions and 92 deletions.
2 changes: 1 addition & 1 deletion codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ ignore:
- "**/*.pb.go"
- "**/*.proto"
- "internal/metastore/db/dbmodel/mocks/.*"
- "internal/mocks"
- "**/mock_*.go"



1 change: 1 addition & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ queryCoord:
loadTimeoutSeconds: 600
checkHandoffInterval: 5000
taskMergeCap: 16
taskExecutionCap: 256
enableActiveStandby: false # Enable active-standby

# Related configuration of queryNode, used to run hybrid search between vector and scalar data.
Expand Down
3 changes: 3 additions & 0 deletions internal/querycoordv2/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ func (s *Server) Start() error {
}
for _, node := range sessions {
s.nodeMgr.Add(session.NewNodeInfo(node.ServerID, node.Address))
s.taskScheduler.AddExecutor(node.ServerID)
}
s.checkReplicas()
for _, node := range sessions {
Expand Down Expand Up @@ -571,6 +572,7 @@ func (s *Server) watchNodes(revision int64) {

func (s *Server) handleNodeUp(node int64) {
log := log.With(zap.Int64("nodeID", node))
s.taskScheduler.AddExecutor(node)
s.distController.StartDistInstance(s.ctx, node)

for _, collection := range s.meta.CollectionManager.GetAll() {
Expand Down Expand Up @@ -598,6 +600,7 @@ func (s *Server) handleNodeUp(node int64) {

func (s *Server) handleNodeDown(node int64) {
log := log.With(zap.Int64("nodeID", node))
s.taskScheduler.RemoveExecutor(node)
s.distController.Remove(node)

// Refresh the targets, to avoid consuming messages too early from channel
Expand Down
22 changes: 15 additions & 7 deletions internal/querycoordv2/task/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ import (
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"go.uber.org/atomic"
"go.uber.org/zap"
)

Expand All @@ -45,7 +47,8 @@ type Executor struct {
// Merge load segment requests
merger *Merger[segmentIndex, *querypb.LoadSegmentsRequest]

executingTasks sync.Map
executingTasks sync.Map
executingTaskNum atomic.Int32
}

func NewExecutor(meta *meta.Meta,
Expand Down Expand Up @@ -82,10 +85,14 @@ func (ex *Executor) Stop() {
// does nothing and returns false if the action is already committed,
// returns true otherwise.
func (ex *Executor) Execute(task Task, step int) bool {
if ex.executingTaskNum.Load() > Params.QueryCoordCfg.TaskExecutionCap {
return false
}
_, exist := ex.executingTasks.LoadOrStore(task.ID(), struct{}{})
if exist {
return false
}
ex.executingTaskNum.Inc()

log := log.With(
zap.Int64("taskID", task.ID()),
Expand Down Expand Up @@ -137,7 +144,7 @@ func (ex *Executor) processMergeTask(mergeTask *LoadSegmentsTask) {
defer func() {
for i := range mergeTask.tasks {
mergeTask.tasks[i].SetErr(task.Err())
ex.removeAction(mergeTask.tasks[i], mergeTask.steps[i])
ex.removeTask(mergeTask.tasks[i], mergeTask.steps[i])
}
}()

Expand Down Expand Up @@ -180,7 +187,7 @@ func (ex *Executor) processMergeTask(mergeTask *LoadSegmentsTask) {
log.Info("load segments done", zap.Int64("taskID", task.ID()), zap.Duration("timeTaken", elapsed))
}

func (ex *Executor) removeAction(task Task, step int) {
func (ex *Executor) removeTask(task Task, step int) {
if task.Err() != nil {
log.Info("excute action done, remove it",
zap.Int64("taskID", task.ID()),
Expand All @@ -189,6 +196,7 @@ func (ex *Executor) removeAction(task Task, step int) {
}

ex.executingTasks.Delete(task.ID())
ex.executingTaskNum.Dec()
}

func (ex *Executor) executeSegmentAction(task *SegmentTask, step int) {
Expand Down Expand Up @@ -218,7 +226,7 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) error {
if err != nil {
task.SetErr(err)
task.Cancel()
ex.removeAction(task, step)
ex.removeTask(task, step)
}
}()

Expand Down Expand Up @@ -270,7 +278,7 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) error {
}

func (ex *Executor) releaseSegment(task *SegmentTask, step int) {
defer ex.removeAction(task, step)
defer ex.removeTask(task, step)
startTs := time.Now()
action := task.Actions()[step].(*SegmentAction)
defer action.isReleaseCommitted.Store(true)
Expand Down Expand Up @@ -343,7 +351,7 @@ func (ex *Executor) executeDmChannelAction(task *ChannelTask, step int) {
}

func (ex *Executor) subDmChannel(task *ChannelTask, step int) error {
defer ex.removeAction(task, step)
defer ex.removeTask(task, step)
startTs := time.Now()
action := task.Actions()[step].(*ChannelAction)
log := log.With(
Expand Down Expand Up @@ -415,7 +423,7 @@ func (ex *Executor) subDmChannel(task *ChannelTask, step int) error {
}

func (ex *Executor) unsubDmChannel(task *ChannelTask, step int) error {
defer ex.removeAction(task, step)
defer ex.removeTask(task, step)
startTs := time.Now()
action := task.Actions()[step].(*ChannelAction)
log := log.With(
Expand Down
58 changes: 57 additions & 1 deletion internal/querycoordv2/task/mock_scheduler.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 060649b

Please sign in to comment.