Skip to content

Commit

Permalink
Add more log when creating index, Finished/Failed index will not be c…
Browse files Browse the repository at this point in the history
…ounted in total index tasks (milvus-io#19150)

Signed-off-by: xiaofan-luan <[email protected]>

Signed-off-by: xiaofan-luan <[email protected]>
  • Loading branch information
xiaofan-luan authored Sep 23, 2022
1 parent 17f5e3c commit 901d3fb
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 18 deletions.
13 changes: 7 additions & 6 deletions internal/indexnode/indexnode_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,25 +181,26 @@ func (i *IndexNode) GetJobStats(ctx context.Context, req *indexpb.GetJobStatsReq
},
}, nil
}
utNum, atNum := i.sched.IndexBuildQueue.GetTaskNum()
unissued, active := i.sched.IndexBuildQueue.GetTaskNum()
jobInfos := make([]*indexpb.JobInfo, 0)
i.foreachTaskInfo(func(ClusterID string, buildID UniqueID, info *taskInfo) {
if info.statistic != nil {
jobInfos = append(jobInfos, proto.Clone(info.statistic).(*indexpb.JobInfo))
}
})
slots := 0
if i.sched.buildParallel > utNum+atNum {
slots = i.sched.buildParallel - utNum - atNum
if i.sched.buildParallel > unissued+active {
slots = i.sched.buildParallel - unissued - active
}
logutil.Logger(ctx).Info("Get Index Job Stats", zap.Int("Unissued", unissued), zap.Int("Active", active), zap.Int("Slot", slots))
return &indexpb.GetJobStatsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
TotalJobNum: int64(utNum) + int64(atNum),
InProgressJobNum: int64(atNum),
EnqueueJobNum: int64(utNum),
TotalJobNum: int64(active) + int64(unissued),
InProgressJobNum: int64(active),
EnqueueJobNum: int64(unissued),
TaskSlots: int64(slots),
JobInfos: jobInfos,
}, nil
Expand Down
2 changes: 2 additions & 0 deletions internal/indexnode/indexnode_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func TestIndexNodeSimple(t *testing.T) {
case <-timeout:
t.Fatal("timeout for querying jobs")
default:
time.Sleep(1 * time.Millisecond)
resp, err := in.QueryJobs(ctx, queryJob)
assert.Nil(t, err)
assert.Equal(t, resp.Status.ErrorCode, commonpb.ErrorCode_Success)
Expand All @@ -109,6 +110,7 @@ func TestIndexNodeSimple(t *testing.T) {
}
}
}

}
}

Expand Down
21 changes: 13 additions & 8 deletions internal/indexnode/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,7 @@ func (it *indexBuildTask) SetState(state commonpb.IndexState, failReason string)
}

func (it *indexBuildTask) GetState() commonpb.IndexState {
state, ok := it.node.loadTaskState(it.ClusterID, it.BuildID)
if !ok {
return commonpb.IndexState_IndexStateNone
}
return state
return it.node.loadTaskState(it.ClusterID, it.BuildID)
}

// OnEnqueue enqueues indexing tasks.
Expand All @@ -145,6 +141,7 @@ func (it *indexBuildTask) OnEnqueue(ctx context.Context) error {
}

func (it *indexBuildTask) Prepare(ctx context.Context) error {
logutil.Logger(ctx).Info("Begin to prepare indexBuildTask", zap.Int64("buildID", it.BuildID), zap.Int64("Collection", it.collectionID), zap.Int64("SegmentIf", it.segmentID))
typeParams := make(map[string]string)
indexParams := make(map[string]string)

Expand Down Expand Up @@ -200,6 +197,7 @@ func (it *indexBuildTask) Prepare(ctx context.Context) error {
// ignore error
}
}
logutil.Logger(ctx).Info("Successfully prepare indexBuildTask", zap.Int64("buildID", it.BuildID), zap.Int64("Collection", it.collectionID), zap.Int64("SegmentIf", it.segmentID))
// setup chunkmanager
// opts := make([]storage.Option, 0)
// // TODO: secret access key_id
Expand Down Expand Up @@ -263,7 +261,13 @@ func (it *indexBuildTask) LoadData(ctx context.Context) error {
it.tr.Record("load field data done")
metrics.IndexNodeLoadFieldLatency.WithLabelValues(strconv.FormatInt(Params.IndexNodeCfg.GetNodeID(), 10)).Observe(float64(loadVectorDuration))

return it.decodeBlobs(ctx, blobs)
err = it.decodeBlobs(ctx, blobs)
if err != nil {
logutil.Logger(ctx).Info("failed to decode blobs", zap.Int64("buildID", it.BuildID), zap.Int64("Collection", it.collectionID), zap.Int64("SegmentIf", it.segmentID), zap.Error(err))
} else {
logutil.Logger(ctx).Info("Successfully load data", zap.Int64("buildID", it.BuildID), zap.Int64("Collection", it.collectionID), zap.Int64("SegmentIf", it.segmentID))
}
return err
}

func (it *indexBuildTask) BuildIndex(ctx context.Context) error {
Expand Down Expand Up @@ -330,6 +334,7 @@ func (it *indexBuildTask) BuildIndex(ctx context.Context) error {
encodeIndexFileDur := it.tr.Record("index codec serialize done")
metrics.IndexNodeEncodeIndexFileLatency.WithLabelValues(strconv.FormatInt(Params.IndexNodeCfg.GetNodeID(), 10)).Observe(float64(encodeIndexFileDur.Milliseconds()))
it.indexBlobs = serializedIndexBlobs
logutil.Logger(ctx).Info("Successfully build index", zap.Int64("buildID", it.BuildID), zap.Int64("Collection", it.collectionID), zap.Int64("SegmentID", it.segmentID))
return nil
}

Expand Down Expand Up @@ -457,8 +462,8 @@ func (it *indexBuildTask) SaveIndexFiles(ctx context.Context) error {
saveIndexFileDur := it.tr.Record("index file save done")
metrics.IndexNodeSaveIndexFileLatency.WithLabelValues(strconv.FormatInt(Params.IndexNodeCfg.GetNodeID(), 10)).Observe(float64(saveIndexFileDur.Milliseconds()))
it.tr.Elapse("index building all done")
logutil.Logger(ctx).Info("IndexNode CreateIndex successfully ", zap.Int64("collect", it.collectionID),
zap.Int64("partition", it.partitionID), zap.Int64("segment", it.segmentID))
logutil.Logger(ctx).Info("Successfully save index files", zap.Int64("buildID", it.BuildID), zap.Int64("Collection", it.collectionID),
zap.Int64("partition", it.partitionID), zap.Int64("SegmentId", it.segmentID))
return nil
}

Expand Down
9 changes: 7 additions & 2 deletions internal/indexnode/task_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,13 @@ func (queue *IndexTaskQueue) GetTaskNum() (int, int) {
defer queue.atLock.Unlock()

utNum := queue.unissuedTasks.Len()
atNum := len(queue.activeTasks)

atNum := 0
// remove the finished task
for _, task := range queue.activeTasks {
if task.GetState() != commonpb.IndexState_Finished && task.GetState() != commonpb.IndexState_Failed {
atNum++
}
}
return utNum, atNum
}

Expand Down
7 changes: 5 additions & 2 deletions internal/indexnode/taskinfo_ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@ func (i *IndexNode) loadOrStoreTask(ClusterID string, buildID UniqueID, info *ta
return nil
}

func (i *IndexNode) loadTaskState(ClusterID string, buildID UniqueID) (commonpb.IndexState, bool) {
func (i *IndexNode) loadTaskState(ClusterID string, buildID UniqueID) commonpb.IndexState {
key := taskKey{ClusterID: ClusterID, BuildID: buildID}
i.stateLock.Lock()
defer i.stateLock.Unlock()
task, ok := i.tasks[key]
return task.state, ok
if !ok {
return commonpb.IndexState_IndexStateNone
}
return task.state
}

func (i *IndexNode) storeTaskState(ClusterID string, buildID UniqueID, state commonpb.IndexState, failReason string) {
Expand Down

0 comments on commit 901d3fb

Please sign in to comment.