From 525201decc3e6ba7fcf5854fdb185e07c2d0c617 Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Thu, 15 Sep 2022 15:44:31 +0800 Subject: [PATCH] Mark index meta as deleted when segment is compacted (#19163) Signed-off-by: cai.zhang Signed-off-by: cai.zhang --- internal/indexcoord/flush_segment_watcher.go | 392 +++------- .../flushed_segment_watcher_test.go | 477 ++++++------ internal/indexcoord/garbage_collector.go | 23 +- internal/indexcoord/garbage_collector_test.go | 678 ++++++++++-------- internal/indexcoord/index_builder.go | 22 +- internal/indexcoord/index_coord.go | 24 +- internal/indexcoord/meta_table.go | 69 +- internal/indexcoord/meta_table_test.go | 42 +- internal/indexcoord/node_manager_test.go | 258 ++++--- internal/indexcoord/task_state.go | 7 +- 10 files changed, 968 insertions(+), 1024 deletions(-) diff --git a/internal/indexcoord/flush_segment_watcher.go b/internal/indexcoord/flush_segment_watcher.go index 421a7c58126ad..bcc94c4d68e6e 100644 --- a/internal/indexcoord/flush_segment_watcher.go +++ b/internal/indexcoord/flush_segment_watcher.go @@ -36,6 +36,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/util" + "github.com/milvus-io/milvus/internal/util/logutil" ) type flushedSegmentWatcher struct { @@ -47,9 +48,7 @@ type flushedSegmentWatcher struct { scheduleDuration time.Duration internalTaskMutex sync.RWMutex - childrenTaskMutex sync.RWMutex internalNotify chan struct{} - childrenNotify chan struct{} etcdRevision int64 watchChan clientv3.WatchChan @@ -59,8 +58,6 @@ type flushedSegmentWatcher struct { ic *IndexCoord internalTasks map[UniqueID]*internalTask - // segmentID -> indexID -> flushedSegmentTask, if there is no index or no need to build index, indexID is zero. - childrenTasks map[UniqueID]map[UniqueID]*childrenTask } type internalTask struct { @@ -68,11 +65,6 @@ type internalTask struct { segmentInfo *datapb.SegmentInfo } -type childrenTask struct { - internalTask - indexInfo *querypb.FieldIndexInfo -} - func newFlushSegmentWatcher(ctx context.Context, kv kv.MetaKv, meta *metaTable, builder *indexBuilder, ic *IndexCoord) (*flushedSegmentWatcher, error) { ctx, cancel := context.WithCancel(ctx) fsw := &flushedSegmentWatcher{ @@ -81,10 +73,8 @@ func newFlushSegmentWatcher(ctx context.Context, kv kv.MetaKv, meta *metaTable, kvClient: kv, wg: sync.WaitGroup{}, internalTaskMutex: sync.RWMutex{}, - childrenTaskMutex: sync.RWMutex{}, scheduleDuration: time.Second, internalNotify: make(chan struct{}, 1), - childrenNotify: make(chan struct{}, 1), meta: meta, builder: builder, ic: ic, @@ -99,7 +89,6 @@ func newFlushSegmentWatcher(ctx context.Context, kv kv.MetaKv, meta *metaTable, func (fsw *flushedSegmentWatcher) reloadFromKV() error { log.Info("flushSegmentWatcher reloadFromKV") fsw.internalTasks = make(map[UniqueID]*internalTask) - fsw.childrenTasks = make(map[int64]map[int64]*childrenTask) _, values, version, err := fsw.kvClient.LoadWithRevision(util.FlushedSegmentPrefix) if err != nil { log.Error("flushSegmentWatcher reloadFromKV fail", zap.String("prefix", util.FlushedSegmentPrefix), zap.Error(err)) @@ -120,9 +109,6 @@ func (fsw *flushedSegmentWatcher) reloadFromKV() error { func (fsw *flushedSegmentWatcher) Start() { fsw.wg.Add(1) go fsw.internalScheduler() - - fsw.wg.Add(1) - go fsw.childrenScheduler() } func (fsw *flushedSegmentWatcher) Stop() { @@ -131,57 +117,20 @@ func (fsw *flushedSegmentWatcher) Stop() { } func (fsw *flushedSegmentWatcher) enqueueInternalTask(segmentID UniqueID) { + defer fsw.internalNotifyFunc() fsw.internalTaskMutex.Lock() + defer fsw.internalTaskMutex.Unlock() - fsw.internalTasks[segmentID] = &internalTask{ - state: indexTaskInit, - segmentInfo: nil, - } - fsw.internalTaskMutex.Unlock() - - fsw.prepare(segmentID) - fsw.internalNotifyFunc() -} - -func (fsw *flushedSegmentWatcher) enqueueChildrenTask(segmentInfo *datapb.SegmentInfo, index *model.Index) { - fsw.childrenTaskMutex.Lock() - defer fsw.childrenTaskMutex.Unlock() + logutil.Logger(fsw.ctx).Info("flushedSegmentWatcher enqueueInternalTask", zap.Int64("segmentID", segmentID)) - if _, ok := fsw.childrenTasks[segmentInfo.ID]; !ok { - fsw.childrenTasks[segmentInfo.ID] = make(map[int64]*childrenTask) - } - fsw.childrenTasks[segmentInfo.ID][index.IndexID] = &childrenTask{ - internalTask: internalTask{ - state: indexTaskInit, - segmentInfo: segmentInfo, - }, - indexInfo: &querypb.FieldIndexInfo{ - FieldID: index.FieldID, - EnableIndex: true, - IndexName: index.IndexName, - IndexID: index.IndexID, - IndexParams: index.IndexParams, - }, - } - log.Debug("IndexCoord flushedSegmentWatcher create index for flushed segment", - zap.Int64("segID", segmentInfo.ID), zap.Int64("indexID", index.IndexID)) - hasIndex, buildID := fsw.meta.HasSameIndex(segmentInfo.ID, index.IndexID) - if hasIndex { - fsw.childrenTasks[segmentInfo.ID][index.IndexID].indexInfo.BuildID = buildID - state := fsw.meta.GetSegmentIndexState(segmentInfo.ID, index.IndexID) - switch state.state { - case commonpb.IndexState_IndexStateNone: - fsw.childrenTasks[segmentInfo.ID][index.IndexID].state = indexTaskInit - case commonpb.IndexState_InProgress, commonpb.IndexState_Unissued, commonpb.IndexState_Retry: - fsw.childrenTasks[segmentInfo.ID][index.IndexID].state = indexTaskInProgress - - case commonpb.IndexState_Finished, commonpb.IndexState_Failed: - fsw.childrenTasks[segmentInfo.ID][index.IndexID].state = indexTaskDone - default: - // can not to here + if _, ok := fsw.internalTasks[segmentID]; !ok { + fsw.internalTasks[segmentID] = &internalTask{ + state: indexTaskPrepare, + segmentInfo: nil, } + return } - fsw.childrenNotifyFunc() + logutil.Logger(fsw.ctx).Info("flushedSegmentWatcher already have the task", zap.Int64("segmentID", segmentID)) } func (fsw *flushedSegmentWatcher) internalScheduler() { @@ -204,26 +153,6 @@ func (fsw *flushedSegmentWatcher) internalScheduler() { } } -func (fsw *flushedSegmentWatcher) childrenScheduler() { - log.Info("IndexCoord flushedSegmentWatcher childrenScheduler start...") - defer fsw.wg.Done() - - ticker := time.NewTicker(fsw.scheduleDuration) - defer ticker.Stop() - - for { - select { - case <-fsw.ctx.Done(): - log.Warn("IndexCoord flushedSegmentWatcher context done") - return - case <-ticker.C: - fsw.childrenRun() - case <-fsw.childrenNotify: - fsw.childrenRun() - } - } -} - func (fsw *flushedSegmentWatcher) internalRun() { fsw.internalTaskMutex.RLock() segmentIDs := make([]UniqueID, 0, len(fsw.internalTasks)) @@ -238,30 +167,8 @@ func (fsw *flushedSegmentWatcher) internalRun() { } fsw.internalTaskMutex.RUnlock() - for _, segID := range segmentIDs { - fsw.internalProcess(segID) - break - } -} - -func (fsw *flushedSegmentWatcher) childrenRun() { - fsw.childrenTaskMutex.RLock() - segmentIDs := make([]UniqueID, 0, len(fsw.childrenTasks)) - if len(fsw.childrenTasks) > 0 { - log.Debug("IndexCoord flushedSegmentWatcher schedule children tasks", zap.Int("children task num", len(fsw.childrenTasks))) - for segID := range fsw.childrenTasks { - segmentIDs = append(segmentIDs, segID) - } - sort.Slice(segmentIDs, func(i, j int) bool { - return segmentIDs[i] < segmentIDs[j] - }) - } - fsw.childrenTaskMutex.RUnlock() - for _, segID := range segmentIDs { - tasks := fsw.getChildrenTasks(segID) - for _, t := range tasks { - fsw.childrenProcess(t) - } + for _, segmentID := range segmentIDs { + fsw.internalProcess(segmentID) } } @@ -272,16 +179,10 @@ func (fsw *flushedSegmentWatcher) internalNotifyFunc() { } } -func (fsw *flushedSegmentWatcher) childrenNotifyFunc() { - select { - case fsw.childrenNotify <- struct{}{}: - default: - } -} - func (fsw *flushedSegmentWatcher) updateInternalTaskState(segID UniqueID, state indexTaskState) { fsw.internalTaskMutex.Lock() defer fsw.internalTaskMutex.Unlock() + log.Debug("flushedSegmentWatcher updateInternalTaskState", zap.Int64("segID", segID), zap.String("state", state.String())) if _, ok := fsw.internalTasks[segID]; ok { fsw.internalTasks[segID].state = state } @@ -292,6 +193,7 @@ func (fsw *flushedSegmentWatcher) deleteInternalTask(segID UniqueID) { defer fsw.internalTaskMutex.Unlock() delete(fsw.internalTasks, segID) + log.Debug("flushedSegmentWatcher delete the internal task", zap.Int64("segID", segID)) } func (fsw *flushedSegmentWatcher) getInternalTask(segID UniqueID) *internalTask { @@ -311,76 +213,56 @@ func (fsw *flushedSegmentWatcher) setInternalTaskSegmentInfo(segID UniqueID, seg if _, ok := fsw.internalTasks[segID]; ok { fsw.internalTasks[segID].segmentInfo = segInfo } + log.Debug("flushedSegmentWatcher set internal task segment info success", zap.Int64("segID", segID)) } -func (fsw *flushedSegmentWatcher) updateChildrenTaskState(segID, indexID UniqueID, state indexTaskState) { - fsw.childrenTaskMutex.Lock() - defer fsw.childrenTaskMutex.Unlock() - - if tasks, ok := fsw.childrenTasks[segID]; ok { - if _, ok = tasks[indexID]; ok { - fsw.childrenTasks[segID][indexID].state = state - - } - } -} - -func (fsw *flushedSegmentWatcher) hasChildrenTaskDone(segID UniqueID) bool { - fsw.childrenTaskMutex.RLock() - defer fsw.childrenTaskMutex.RUnlock() - if tasks, ok := fsw.childrenTasks[segID]; !ok || len(tasks) == 0 { - return true - } - return false -} - -func (fsw *flushedSegmentWatcher) getChildrenTasks(segID UniqueID) map[UniqueID]*childrenTask { - fsw.childrenTaskMutex.RLock() - defer fsw.childrenTaskMutex.RUnlock() - tasks := make(map[UniqueID]*childrenTask) - if ts, ok := fsw.childrenTasks[segID]; ok { - for k, v := range ts { - tasks[k] = v +func (fsw *flushedSegmentWatcher) allParentsDone(segIDs []UniqueID) bool { + fsw.internalTaskMutex.RLock() + defer fsw.internalTaskMutex.RUnlock() + done := true + for _, segID := range segIDs { + if _, ok := fsw.internalTasks[segID]; ok { + done = false + break } } - return tasks -} - -func (fsw *flushedSegmentWatcher) deleteChildTask(segID, indexID UniqueID) { - fsw.childrenTaskMutex.Lock() - defer fsw.childrenTaskMutex.Unlock() - if _, ok := fsw.childrenTasks[segID]; ok { - delete(fsw.childrenTasks[segID], indexID) - } -} - -func (fsw *flushedSegmentWatcher) deleteChildrenTask(segID UniqueID) { - fsw.childrenTaskMutex.Lock() - defer fsw.childrenTaskMutex.Unlock() - - delete(fsw.childrenTasks, segID) + return done } func (fsw *flushedSegmentWatcher) internalProcess(segID UniqueID) { - // first pull segmentInfo - if err := fsw.pullSegmentInfo(segID); err != nil { - return - } t := fsw.getInternalTask(segID) log.Debug("IndexCoord flushedSegmentWatcher process internal task", zap.Int64("segID", segID), zap.String("state", t.state.String())) switch t.state { + case indexTaskPrepare: + if err := fsw.prepare(segID); err != nil { + log.Error("flushedSegmentWatcher prepare internal task fail", zap.Int64("segID", segID), zap.Error(err)) + return + } + fsw.updateInternalTaskState(segID, indexTaskInit) case indexTaskInit: - fsw.constructTask(t) + if err := fsw.constructTask(t); err != nil { + log.Error("flushedSegmentWatcher construct task fail", zap.Int64("segID", segID), zap.Error(err)) + return + } fsw.updateInternalTaskState(segID, indexTaskInProgress) fsw.internalNotifyFunc() case indexTaskInProgress: - if fsw.hasChildrenTaskDone(segID) { + state := fsw.meta.GetSegmentIndexState(segID) + if state.state == commonpb.IndexState_Finished || state.state == commonpb.IndexState_Failed || state.state == commonpb.IndexState_IndexStateNone { + log.Debug("all tasks are finished", zap.Int64("segID", segID), zap.String("state", state.state.String())) fsw.updateInternalTaskState(segID, indexTaskDone) fsw.internalNotifyFunc() } case indexTaskDone: + if !fsw.allParentsDone(t.segmentInfo.CompactionFrom) { + log.Debug("flushed segment create index done, but there are still parent task that haven't written handoff event", + zap.Int64("segID", segID), zap.Int64s("compactionFrom", t.segmentInfo.CompactionFrom)) + return + } + indexInfos := fsw.meta.GetSegmentIndexes(segID) + enableIndex := len(indexInfos) > 0 handoffTask := &querypb.SegmentInfo{ SegmentID: segID, CollectionID: t.segmentInfo.CollectionID, @@ -392,11 +274,24 @@ func (fsw *flushedSegmentWatcher) internalProcess(segID UniqueID) { CompactionFrom: t.segmentInfo.CompactionFrom, CreatedByCompaction: t.segmentInfo.CreatedByCompaction, SegmentState: t.segmentInfo.State, - IndexInfos: nil, - EnableIndex: false, + IndexInfos: make([]*querypb.FieldIndexInfo, 0), + EnableIndex: enableIndex, + } + for _, indexInfo := range indexInfos { + handoffTask.IndexInfos = append(handoffTask.IndexInfos, &querypb.FieldIndexInfo{ + FieldID: fsw.meta.GetFieldIDByIndexID(t.segmentInfo.CollectionID, indexInfo.IndexID), + EnableIndex: true, + IndexName: fsw.meta.GetIndexNameByID(t.segmentInfo.CollectionID, indexInfo.IndexID), + IndexID: indexInfo.IndexID, + BuildID: indexInfo.BuildID, + IndexParams: fsw.meta.GetIndexParams(t.segmentInfo.CollectionID, indexInfo.IndexID), + //IndexFilePaths: nil, + //IndexSize: 0, + }) } + if err := fsw.writeHandoffSegment(handoffTask); err != nil { - log.Error("IndexCoord flushSegmentWatcher writeHandoffSegment with no index fail", + log.Error("IndexCoord flushSegmentWatcher writeHandoffSegment with no index info fail", zap.Int64("segID", segID), zap.Error(err)) return } @@ -407,119 +302,48 @@ func (fsw *flushedSegmentWatcher) internalProcess(segID UniqueID) { } fsw.deleteInternalTask(segID) fsw.internalNotifyFunc() - case indexTaskDeleted: - if t.segmentInfo.CreatedByCompaction { - fsw.removeCompactedTasks(t) - } - fsw.updateInternalTaskState(segID, indexTaskDone) default: - log.Debug("IndexCoord flushedSegmentWatcher internal task get invalid state", zap.Int64("segID", t.segmentInfo.ID), + log.Debug("IndexCoord flushedSegmentWatcher internal task get invalid state", zap.Int64("segID", segID), zap.String("state", t.state.String())) } } -func (fsw *flushedSegmentWatcher) childrenProcess(task *childrenTask) { - log.Debug("IndexCoord flushedSegmentWatcher process child task", zap.Int64("segID", task.segmentInfo.ID), - zap.Int64("indexID", task.indexInfo.IndexID), zap.String("state", task.state.String())) - segID := task.segmentInfo.ID - switch task.state { - case indexTaskInit: - //get binLogs - binLogs := make([]string, 0) - for _, fieldBinLog := range task.segmentInfo.Binlogs { - if fieldBinLog.GetFieldID() == task.indexInfo.FieldID { - for _, binLog := range fieldBinLog.GetBinlogs() { - binLogs = append(binLogs, binLog.LogPath) - } - break - } - } +func (fsw *flushedSegmentWatcher) constructTask(t *internalTask) error { + log.Debug("IndexCoord flushedSegmentWatcher construct tasks by segment info", zap.Int64("segID", t.segmentInfo.ID), + zap.Int64s("compactionFrom", t.segmentInfo.CompactionFrom)) + fieldIndexes := fsw.meta.GetIndexesForCollection(t.segmentInfo.CollectionID, "") + if len(fieldIndexes) == 0 { + log.Debug("segment no need to build index", zap.Int64("segmentID", t.segmentInfo.ID), + zap.Int64("num of rows", t.segmentInfo.NumOfRows), zap.Int("collection indexes num", len(fieldIndexes))) + // no need to build index + return nil + } + + for _, index := range fieldIndexes { segIdx := &model.SegmentIndex{ - SegmentID: segID, - CollectionID: task.segmentInfo.CollectionID, - PartitionID: task.segmentInfo.PartitionID, - NumRows: task.segmentInfo.NumOfRows, - IndexID: task.indexInfo.IndexID, - CreateTime: task.segmentInfo.StartPosition.Timestamp, + SegmentID: t.segmentInfo.ID, + CollectionID: t.segmentInfo.CollectionID, + PartitionID: t.segmentInfo.PartitionID, + NumRows: t.segmentInfo.NumOfRows, + IndexID: index.IndexID, + CreateTime: t.segmentInfo.StartPosition.Timestamp, } //create index task for metaTable // send to indexBuilder have, buildID, err := fsw.ic.createIndexForSegment(segIdx) if err != nil { - log.Warn("IndexCoord create index for segment fail", zap.Int64("segID", segID), - zap.Int64("indexID", task.indexInfo.IndexID), zap.Error(err)) - return + log.Warn("IndexCoord create index for segment fail", zap.Int64("segID", t.segmentInfo.ID), + zap.Int64("indexID", index.IndexID), zap.Error(err)) + return err } if !have { fsw.builder.enqueue(buildID) } - - fsw.updateChildrenTaskState(segID, task.indexInfo.IndexID, indexTaskInProgress) - fsw.childrenNotifyFunc() - case indexTaskInProgress: - state := fsw.meta.GetSegmentIndexState(task.segmentInfo.ID, task.indexInfo.IndexID) - if state.state == commonpb.IndexState_IndexStateNone { - log.Debug("task is no need to build index, remove task", zap.Int64("segID", task.segmentInfo.ID), - zap.Int64("indexID", task.indexInfo.IndexID)) - fsw.deleteChildTask(task.segmentInfo.ID, task.indexInfo.IndexID) - fsw.childrenNotifyFunc() - return - } - if state.state != commonpb.IndexState_Finished && state.state != commonpb.IndexState_Failed { - log.Debug("the index on segment is not finish", zap.Int64("segID", segID), - zap.String("state", state.state.String()), zap.String("fail reason", state.failReason)) - return - } - // don't set index files, QueryCoord get index files from IndexCoord by grpc. - //fsw.childrenTasks[segID][task.indexInfo.IndexID].indexInfo.IndexFilePaths = filePath.IndexFilePaths - //fsw.childrenTasks[segID][task.indexInfo.IndexID].indexInfo.IndexSize = int64(filePath.SerializedSize) - fsw.updateChildrenTaskState(segID, task.indexInfo.IndexID, indexTaskDone) - fsw.childrenNotifyFunc() - case indexTaskDone: - handoffTask := &querypb.SegmentInfo{ - SegmentID: task.segmentInfo.ID, - CollectionID: task.segmentInfo.CollectionID, - PartitionID: task.segmentInfo.PartitionID, - NumRows: task.segmentInfo.NumOfRows, - DmChannel: task.segmentInfo.GetInsertChannel(), - IndexName: task.indexInfo.IndexName, - IndexID: task.indexInfo.IndexID, - CompactionFrom: task.segmentInfo.CompactionFrom, - CreatedByCompaction: task.segmentInfo.CreatedByCompaction, - SegmentState: task.segmentInfo.State, - IndexInfos: []*querypb.FieldIndexInfo{task.indexInfo}, - EnableIndex: true, - } - if err := fsw.writeHandoffSegment(handoffTask); err != nil { - log.Warn("IndexCoord writeHandoffSegment fail, wait to retry", zap.Int64("collID", task.segmentInfo.CollectionID), - zap.Int64("partID", task.segmentInfo.PartitionID), zap.Int64("segID", task.segmentInfo.ID), zap.Error(err)) - return - } - log.Debug("IndexCoord writeHandoffSegment success", zap.Int64("collID", task.segmentInfo.CollectionID), - zap.Int64("partID", task.segmentInfo.PartitionID), zap.Int64("segID", task.segmentInfo.ID)) - fsw.deleteChildTask(task.segmentInfo.ID, task.indexInfo.IndexID) - fsw.childrenNotifyFunc() - default: - log.Debug("IndexCoord flushedSegmentWatcher internal task get invalid state", zap.Int64("segID", task.segmentInfo.ID), - zap.String("state", task.state.String())) - } -} - -func (fsw *flushedSegmentWatcher) constructTask(t *internalTask) { - log.Debug("IndexCoord flushedSegmentWatcher construct tasks by segment info", zap.Int64("segID", t.segmentInfo.ID), - zap.Int64s("compactionFrom", t.segmentInfo.CompactionFrom)) - fieldIndexes := fsw.meta.GetIndexesForCollection(t.segmentInfo.CollectionID, "") - if len(fieldIndexes) == 0 { - log.Debug("segment no need to build index", zap.Int64("segmentID", t.segmentInfo.ID), - zap.Int64("num of rows", t.segmentInfo.NumOfRows), zap.Int("collection indexes num", len(fieldIndexes))) - // no need to build index - return - } - - for _, index := range fieldIndexes { - fsw.enqueueChildrenTask(t.segmentInfo, index) } + log.Debug("flushedSegmentWatcher construct children task success", zap.Int64("segID", t.segmentInfo.ID), + zap.Int("tasks num", len(fieldIndexes))) + return nil } func (fsw *flushedSegmentWatcher) writeHandoffSegment(t *querypb.SegmentInfo) error { @@ -587,30 +411,40 @@ func (fsw *flushedSegmentWatcher) pullSegmentInfo(segmentID UniqueID) error { return errors.New(errMsg) } -func (fsw *flushedSegmentWatcher) prepare(segID UniqueID) { +func (fsw *flushedSegmentWatcher) prepare(segID UniqueID) error { + defer fsw.internalNotifyFunc() + log.Debug("prepare flushed segment task", zap.Int64("segID", segID)) if err := fsw.pullSegmentInfo(segID); err != nil { log.Error("flushedSegmentWatcher get segment info fail", zap.Int64("segID", segID), zap.Error(err)) if errors.Is(err, ErrSegmentNotFound) { fsw.deleteInternalTask(segID) - return + return err } - return - } - t := fsw.getInternalTask(segID) - if t.segmentInfo.CreatedByCompaction { - fsw.removeCompactedTasks(t) + return err } + //t := fsw.getInternalTask(segID) + //if t.segmentInfo.CreatedByCompaction { + // if err := fsw.removeCompactedTasks(t); err != nil { + // return err + // } + //} + return nil } -func (fsw *flushedSegmentWatcher) removeCompactedTasks(t *internalTask) { - log.Debug("IndexCoord flushedSegmentWatcher mark task as deleted which is compacted", zap.Int64("segID", t.segmentInfo.ID), - zap.Int64s("compactionFrom", t.segmentInfo.CompactionFrom)) - fsw.builder.markTasksAsDeleted(fsw.meta.GetBuildIDsFromSegIDs(t.segmentInfo.CompactionFrom)) - for _, segID := range t.segmentInfo.CompactionFrom { - fsw.deleteChildrenTask(segID) - if _, ok := fsw.internalTasks[segID]; ok { - fsw.updateInternalTaskState(segID, indexTaskDeleted) - } - } -} +//func (fsw *flushedSegmentWatcher) removeCompactedTasks(t *internalTask) error { +// log.Debug("IndexCoord flushedSegmentWatcher mark task as deleted which is compacted", zap.Int64("segID", t.segmentInfo.ID), +// zap.Int64s("compactionFrom", t.segmentInfo.CompactionFrom)) +// if err := fsw.builder.markTasksAsDeleted(fsw.meta.GetBuildIDsFromSegIDs(t.segmentInfo.CompactionFrom)); err != nil { +// log.Error("mark index meta fail, try again", zap.Int64s("compacted segIDs", t.segmentInfo.CompactionFrom), +// zap.Error(err)) +// return err +// } +// for _, segID := range t.segmentInfo.CompactionFrom { +// fsw.deleteChildrenTask(segID) +// if _, ok := fsw.internalTasks[segID]; ok { +// fsw.updateInternalTaskState(segID, indexTaskDeleted) +// } +// } +// return nil +//} diff --git a/internal/indexcoord/flushed_segment_watcher_test.go b/internal/indexcoord/flushed_segment_watcher_test.go index 44f35142f18e7..d22ae40da8444 100644 --- a/internal/indexcoord/flushed_segment_watcher_test.go +++ b/internal/indexcoord/flushed_segment_watcher_test.go @@ -22,6 +22,8 @@ import ( "sync" "testing" + "github.com/milvus-io/milvus/internal/proto/indexpb" + "github.com/stretchr/testify/assert" "github.com/milvus-io/milvus/internal/metastore/kv/indexcoord" @@ -29,7 +31,6 @@ import ( "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" - "github.com/milvus-io/milvus/internal/proto/querypb" ) func Test_flushSegmentWatcher(t *testing.T) { @@ -94,299 +95,279 @@ func Test_flushSegmentWatcher_newFlushSegmentWatcher(t *testing.T) { }) } -func Test_flushSegmentWatcher_prepare(t *testing.T) { +func Test_flushSegmentWatcher_internalProcess_success(t *testing.T) { + meta := &metaTable{ + segmentIndexLock: sync.RWMutex{}, + indexLock: sync.RWMutex{}, + catalog: &indexcoord.Catalog{Txn: &mockETCDKV{ + multiSave: func(m map[string]string) error { + return nil + }, + }}, + collectionIndexes: map[UniqueID]map[UniqueID]*model.Index{ + collID: { + indexID: { + TenantID: "", + CollectionID: collID, + FieldID: fieldID, + IndexID: indexID, + IndexName: indexName, + IsDeleted: false, + CreateTime: 1, + TypeParams: nil, + IndexParams: nil, + }, + }, + }, + segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{ + segID: { + indexID: { + SegmentID: segID, + CollectionID: collID, + PartitionID: partID, + NumRows: 1000, + IndexID: indexID, + BuildID: buildID, + IndexState: commonpb.IndexState_Finished, + }, + }, + }, + buildID2SegmentIndex: map[UniqueID]*model.SegmentIndex{ + buildID: { + SegmentID: segID, + CollectionID: collID, + PartitionID: partID, + NumRows: 1000, + IndexID: indexID, + BuildID: buildID, + IndexState: commonpb.IndexState_Finished, + }, + }, + } task := &internalTask{ - state: indexTaskInit, + state: indexTaskPrepare, segmentInfo: nil, } - t.Run("success", func(t *testing.T) { - fsw := &flushedSegmentWatcher{ - ic: &IndexCoord{ - dataCoordClient: &DataCoordMock{ - CallGetSegmentInfo: func(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) { - return &datapb.GetSegmentInfoResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - Reason: "", - }, - Infos: []*datapb.SegmentInfo{ - { - ID: segID + 100, - CollectionID: collID, - PartitionID: partID, - NumOfRows: 10000, - State: commonpb.SegmentState_Flushed, - CreatedByCompaction: true, - CompactionFrom: []int64{segID}, + + fsw := &flushedSegmentWatcher{ + ic: &IndexCoord{ + dataCoordClient: &DataCoordMock{ + CallGetSegmentInfo: func(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) { + return &datapb.GetSegmentInfoResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + Reason: "", + }, + Infos: []*datapb.SegmentInfo{ + { + ID: segID, + CollectionID: collID, + PartitionID: partID, + NumOfRows: 10000, + State: commonpb.SegmentState_Flushed, + CreatedByCompaction: true, + CompactionFrom: []int64{}, + StartPosition: &internalpb.MsgPosition{ + ChannelName: "", + MsgID: nil, + MsgGroup: "", + Timestamp: 1, }, }, - }, nil - }, - }, - }, - internalTasks: map[UniqueID]*internalTask{ - segID: task, - segID + 100: {state: indexTaskInit, segmentInfo: nil}, - }, - meta: &metaTable{ - segmentIndexLock: sync.RWMutex{}, - segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{ - segID: { - indexID: { - SegmentID: segID, - CollectionID: collID, - PartitionID: partID, - NumRows: 1000, - IndexID: indexID, - BuildID: buildID, }, - }, + }, nil }, }, - builder: &indexBuilder{ - taskMutex: sync.RWMutex{}, - scheduleDuration: 0, - tasks: nil, - notifyChan: nil, + metaTable: meta, + }, + internalTasks: map[UniqueID]*internalTask{ + segID: task, + }, + meta: meta, + builder: &indexBuilder{ + taskMutex: sync.RWMutex{}, + scheduleDuration: 0, + tasks: map[int64]indexTaskState{}, + notifyChan: nil, + meta: meta, + }, + kvClient: &mockETCDKV{ + multiSave: func(m map[string]string) error { + return nil }, - } - fsw.prepare(segID + 100) - // idempotent - fsw.prepare(segID + 100) - }) - t.Run("init task get segmentInfo fail", func(t *testing.T) { - fsw := &flushedSegmentWatcher{ - ic: &IndexCoord{ - dataCoordClient: &DataCoordMock{ - CallGetSegmentInfo: func(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) { - return nil, errors.New("error") - }, - }, + save: func(s string, s2 string) error { + return nil }, - internalTasks: map[UniqueID]*internalTask{ - segID: task, + removeWithPrefix: func(key string) error { + return nil }, - } - fsw.prepare(segID) + }, + } + t.Run("prepare", func(t *testing.T) { fsw.internalProcess(segID) - fsw.ic.dataCoordClient = &DataCoordMock{ - CallGetSegmentInfo: func(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) { - return &datapb.GetSegmentInfoResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "fail reason", - }, - }, nil + fsw.internalTaskMutex.RLock() + assert.Equal(t, indexTaskInit, fsw.internalTasks[segID].state) + fsw.internalTaskMutex.RUnlock() + }) + + t.Run("init", func(t *testing.T) { + fsw.internalProcess(segID) + fsw.internalTaskMutex.RLock() + assert.Equal(t, indexTaskInProgress, fsw.internalTasks[segID].state) + fsw.internalTaskMutex.RUnlock() + }) + + err := fsw.meta.FinishTask(&indexpb.IndexTaskInfo{ + BuildID: buildID, + State: commonpb.IndexState_Finished, + IndexFiles: []string{"file1", "file2"}, + SerializedSize: 100, + FailReason: "", + }) + assert.NoError(t, err) + + t.Run("inProgress", func(t *testing.T) { + fsw.internalProcess(segID) + fsw.internalTaskMutex.RLock() + assert.Equal(t, indexTaskDone, fsw.internalTasks[segID].state) + fsw.internalTaskMutex.RUnlock() + }) + + t.Run("done", func(t *testing.T) { + fsw.internalProcess(segID) + fsw.internalTaskMutex.RLock() + _, ok := fsw.internalTasks[segID] + assert.False(t, ok) + fsw.internalTaskMutex.RUnlock() + }) +} + +func Test_flushSegmentWatcher_internalProcess_error(t *testing.T) { + task := &internalTask{ + state: indexTaskPrepare, + segmentInfo: nil, + } + + fsw := &flushedSegmentWatcher{ + ic: &IndexCoord{ + dataCoordClient: &DataCoordMock{ + CallGetSegmentInfo: func(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) { + return &datapb.GetSegmentInfoResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + Reason: "", + }, + Infos: []*datapb.SegmentInfo{ + { + ID: segID + 100, + CollectionID: collID, + PartitionID: partID, + NumOfRows: 10000, + State: commonpb.SegmentState_Flushed, + CreatedByCompaction: true, + CompactionFrom: []int64{segID}, + }, + }, + }, nil + }, }, - } - fsw.prepare(segID) + metaTable: &metaTable{}, + }, + internalTasks: map[UniqueID]*internalTask{ + segID: task, + }, + meta: &metaTable{}, + builder: &indexBuilder{}, + } + t.Run("fail", func(t *testing.T) { fsw.ic.dataCoordClient = &DataCoordMock{ CallGetSegmentInfo: func(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) { - return &datapb.GetSegmentInfoResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: msgSegmentNotFound(segID), - }, - }, nil + return nil, errors.New("error") }, } - fsw.prepare(segID) - _, ok := fsw.internalTasks[segID] - assert.False(t, ok) + fsw.internalProcess(segID) fsw.ic.dataCoordClient = &DataCoordMock{ CallGetSegmentInfo: func(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) { return &datapb.GetSegmentInfoResponse{ Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - Reason: "", - }, - Infos: []*datapb.SegmentInfo{ - { - ID: segID + 100, - CollectionID: collID, - PartitionID: partID, - InsertChannel: "", - NumOfRows: 10000, - }, + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: "fail reason", }, }, nil }, } - fsw.internalTasks = map[UniqueID]*internalTask{ - segID: task, - } - fsw.prepare(segID) + fsw.internalProcess(segID) + fsw.internalTaskMutex.RLock() + assert.Equal(t, indexTaskPrepare, fsw.internalTasks[segID].state) + fsw.internalTaskMutex.RUnlock() }) - t.Run("done task write handoff event fail", func(t *testing.T) { - task := &internalTask{ - state: indexTaskDone, - segmentInfo: &datapb.SegmentInfo{ - ID: 0, - CollectionID: 0, - PartitionID: 0, - InsertChannel: "", - NumOfRows: 0, - State: 0, - MaxRowNum: 0, - LastExpireTime: 0, - StartPosition: nil, - DmlPosition: nil, - Binlogs: nil, - Statslogs: nil, - Deltalogs: nil, - CreatedByCompaction: false, - CompactionFrom: nil, - DroppedAt: 0, + t.Run("write handoff event fail", func(t *testing.T) { + fsw.kvClient = &mockETCDKV{ + save: func(s string, s2 string) error { + return errors.New("error") }, } - fsw := &flushedSegmentWatcher{ - ic: &IndexCoord{ - dataCoordClient: NewDataCoordMock(), - }, - kvClient: &mockETCDKV{ - save: func(s string, s2 string) error { - return errors.New("error") + fsw.internalTasks = map[UniqueID]*internalTask{ + segID: { + state: indexTaskDone, + segmentInfo: &datapb.SegmentInfo{ + ID: segID, + CollectionID: collID, + PartitionID: partID, + InsertChannel: "", + NumOfRows: 0, + State: 0, + MaxRowNum: 0, + LastExpireTime: 0, + StartPosition: nil, + DmlPosition: nil, + Binlogs: nil, + Statslogs: nil, + Deltalogs: nil, + CreatedByCompaction: false, + CompactionFrom: nil, + DroppedAt: 0, }, }, - internalTasks: map[UniqueID]*internalTask{ - segID: task, - }, } - fsw.prepare(segID) fsw.internalProcess(segID) + fsw.internalTaskMutex.RLock() + assert.Equal(t, indexTaskDone, fsw.internalTasks[segID].state) + fsw.internalTaskMutex.RUnlock() }) - t.Run("done task remove flush segment fail", func(t *testing.T) { - task := &internalTask{ - state: indexTaskDone, - segmentInfo: &datapb.SegmentInfo{ - ID: 0, - CollectionID: 0, - PartitionID: 0, - InsertChannel: "", - NumOfRows: 0, - State: 0, - MaxRowNum: 0, - LastExpireTime: 0, - StartPosition: nil, - DmlPosition: nil, - Binlogs: nil, - Statslogs: nil, - Deltalogs: nil, - CreatedByCompaction: false, - CompactionFrom: nil, - DroppedAt: 0, - }, - } - fsw := &flushedSegmentWatcher{ - ic: &IndexCoord{ - dataCoordClient: NewDataCoordMock(), - }, - kvClient: &mockETCDKV{ - save: func(s string, s2 string) error { - return nil - }, - removeWithPrefix: func(s string) error { - return errors.New("error") - }, + t.Run("remove flushed segment fail", func(t *testing.T) { + fsw.kvClient = &mockETCDKV{ + save: func(s string, s2 string) error { + return nil }, - internalTasks: map[UniqueID]*internalTask{ - segID: task, + removeWithPrefix: func(key string) error { + return errors.New("error") }, } - - fsw.prepare(segID) fsw.internalProcess(segID) + fsw.internalTaskMutex.RLock() + assert.Equal(t, indexTaskDone, fsw.internalTasks[segID].state) + fsw.internalTaskMutex.RUnlock() }) -} - -func Test_flushSegmentWatcher_childrenProcess_error(t *testing.T) { - task := &childrenTask{ - internalTask: internalTask{ - state: indexTaskInProgress, - segmentInfo: &datapb.SegmentInfo{ - ID: segID, - CollectionID: 0, - PartitionID: 0, - InsertChannel: "", - NumOfRows: 0, - State: 0, - MaxRowNum: 0, - LastExpireTime: 0, - StartPosition: &internalpb.MsgPosition{ - Timestamp: 1, - }, - }, - }, - indexInfo: &querypb.FieldIndexInfo{ - FieldID: 0, - EnableIndex: true, - IndexName: "", - IndexID: indexID, - BuildID: buildID, - IndexParams: nil, - IndexFilePaths: nil, - IndexSize: 0, - }, - } - - t.Run("inProgress task not finish", func(t *testing.T) { - fsw := &flushedSegmentWatcher{ - ic: &IndexCoord{ - dataCoordClient: NewDataCoordMock(), - }, - childrenTasks: map[UniqueID]map[UniqueID]*childrenTask{ - segID: { - indexID: task, - }, - }, - meta: &metaTable{ - segmentIndexLock: sync.RWMutex{}, - segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{ - segID: { - indexID: &model.SegmentIndex{ - SegmentID: segID, - CollectionID: 0, - PartitionID: 0, - NumRows: 0, - IndexID: indexID, - BuildID: buildID, - NodeID: 1, - IndexVersion: 1, - IndexState: commonpb.IndexState_InProgress, - FailReason: "", - IsDeleted: false, - CreateTime: 0, - IndexFilePaths: nil, - IndexSize: 0, - }, - }, - }, - }, - } - fsw.childrenProcess(task) + t.Run("index is not zero", func(t *testing.T) { + fsw.internalProcess(segID) + fsw.internalTaskMutex.RLock() + assert.Equal(t, indexTaskDone, fsw.internalTasks[segID].state) + fsw.internalTaskMutex.RUnlock() }) - t.Run("inProgress not in meta", func(t *testing.T) { - fsw := &flushedSegmentWatcher{ - ic: &IndexCoord{ - dataCoordClient: NewDataCoordMock(), - }, - childrenTasks: map[UniqueID]map[UniqueID]*childrenTask{ - segID: { - indexID: task, - }, - }, - meta: &metaTable{ - segmentIndexLock: sync.RWMutex{}, - segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{}, + t.Run("invalid state", func(t *testing.T) { + fsw.internalTasks = map[UniqueID]*internalTask{ + segID: { + state: indexTaskDeleted, + segmentInfo: nil, }, } - - fsw.childrenProcess(task) + fsw.internalProcess(segID) }) } diff --git a/internal/indexcoord/garbage_collector.go b/internal/indexcoord/garbage_collector.go index debddf497fae2..495551bd2b46a 100644 --- a/internal/indexcoord/garbage_collector.go +++ b/internal/indexcoord/garbage_collector.go @@ -22,12 +22,13 @@ import ( "sync" "time" + "github.com/milvus-io/milvus/internal/proto/commonpb" + "github.com/milvus-io/milvus/internal/proto/datapb" + "go.uber.org/zap" "github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus/internal/log" - "github.com/milvus-io/milvus/internal/proto/commonpb" - "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/storage" ) @@ -39,8 +40,6 @@ type garbageCollector struct { gcFileDuration time.Duration gcMetaDuration time.Duration - notify chan struct{} - metaTable *metaTable chunkManager storage.ChunkManager indexCoordClient *IndexCoord @@ -75,13 +74,6 @@ func (gc *garbageCollector) Stop() { gc.wg.Wait() } -func (gc *garbageCollector) Notify() { - select { - case gc.notify <- struct{}{}: - default: - } -} - func (gc *garbageCollector) recycleUnusedIndexes() { defer gc.wg.Done() log.Info("IndexCoord garbageCollector recycleUnusedIndexes start") @@ -122,6 +114,8 @@ func (gc *garbageCollector) recycleUnusedIndexes() { zap.Int64("nodeID", segIdx.NodeID), zap.Error(err)) continue } + log.Info("IndexCoord remove segment index meta success", zap.Int64("buildID", segIdx.BuildID), + zap.Int64("nodeID", segIdx.NodeID)) } } } @@ -170,11 +164,9 @@ func (gc *garbageCollector) recycleSegIndexesMeta() { } } } - + //segIndexes := gc.metaTable.GetDeletedSegmentIndexes() for _, meta := range segIndexes { if meta.IsDeleted || gc.metaTable.IsIndexDeleted(meta.CollectionID, meta.IndexID) { - log.Info("index meta need to recycle it", zap.Int64("buildID", meta.BuildID), - zap.Int64("nodeID", meta.NodeID)) if meta.NodeID != 0 { // wait for releasing reference lock continue @@ -187,6 +179,7 @@ func (gc *garbageCollector) recycleSegIndexesMeta() { zap.Int64("nodeID", meta.NodeID), zap.Error(err)) continue } + log.Debug("index meta recycle success", zap.Int64("buildID", meta.BuildID)) } } } @@ -205,8 +198,6 @@ func (gc *garbageCollector) recycleUnusedSegIndexes() { return case <-ticker.C: gc.recycleSegIndexesMeta() - case <-gc.notify: - gc.recycleSegIndexesMeta() } } } diff --git a/internal/indexcoord/garbage_collector_test.go b/internal/indexcoord/garbage_collector_test.go index c7e20f1cf4b79..14f9e97bd60ec 100644 --- a/internal/indexcoord/garbage_collector_test.go +++ b/internal/indexcoord/garbage_collector_test.go @@ -16,315 +16,369 @@ package indexcoord -//func TestGarbageCollector_Start(t *testing.T) { -// ctx, cancel := context.WithCancel(context.Background()) -// gc := &garbageCollector{ -// ctx: ctx, -// cancel: cancel, -// wg: sync.WaitGroup{}, -// gcFileDuration: time.Millisecond * 300, -// gcMetaDuration: time.Millisecond * 300, -// metaTable: &metaTable{}, -// chunkManager: &chunkManagerMock{ -// removeWithPrefix: func(s string) error { -// return nil -// }, -// listWithPrefix: func(s string, recursive bool) ([]string, []time.Time, error) { -// return []string{}, []time.Time{}, nil -// }, -// remove: func(s string) error { -// return nil -// }, -// }, -// } -// -// gc.Start() -// gc.Stop() -//} -// -//func TestGarbageCollector_recycleUnusedIndexFiles(t *testing.T) { -// t.Run("index not in meta and remove with prefix failed", func(t *testing.T) { -// ctx, cancel := context.WithCancel(context.Background()) -// gc := &garbageCollector{ -// ctx: ctx, -// cancel: cancel, -// wg: sync.WaitGroup{}, -// gcFileDuration: time.Millisecond * 300, -// gcMetaDuration: time.Millisecond * 300, -// metaTable: constructMetaTable(&catalog.Catalog{ -// Txn: &mockETCDKV{ -// remove: func(s string) error { -// return fmt.Errorf("error") -// }, -// }, -// }), -// chunkManager: &chunkManagerMock{ -// removeWithPrefix: func(s string) error { -// return fmt.Errorf("error") -// }, -// listWithPrefix: func(s string, recursive bool) ([]string, []time.Time, error) { -// if !recursive { -// return []string{"a/b/1/", "a/b/2/"}, []time.Time{{}, {}}, nil -// } -// return []string{"a/b/1/c", "a/b/2/d"}, []time.Time{{}, {}}, nil -// }, -// remove: func(s string) error { -// return nil -// }, -// }, -// } -// -// gc.wg.Add(1) -// go gc.recycleUnusedIndexFiles() -// time.Sleep(time.Second) -// cancel() -// gc.wg.Wait() -// }) -// -// t.Run("load dir failed", func(t *testing.T) { -// ctx, cancel := context.WithCancel(context.Background()) -// gc := &garbageCollector{ -// ctx: ctx, -// cancel: cancel, -// wg: sync.WaitGroup{}, -// gcFileDuration: time.Millisecond * 300, -// gcMetaDuration: time.Millisecond * 300, -// metaTable: constructMetaTable(&catalog.Catalog{ -// Txn: &mockETCDKV{ -// remove: func(s string) error { -// return fmt.Errorf("error") -// }, -// }, -// }), -// chunkManager: &chunkManagerMock{ -// removeWithPrefix: func(s string) error { -// return nil -// }, -// listWithPrefix: func(s string, recursive bool) ([]string, []time.Time, error) { -// if !recursive { -// return nil, nil, fmt.Errorf("error") -// } -// return []string{"a/b/1/c", "a/b/2/d"}, []time.Time{{}, {}}, nil -// }, -// remove: func(s string) error { -// return nil -// }, -// }, -// } -// -// gc.wg.Add(1) -// go gc.recycleUnusedIndexFiles() -// time.Sleep(time.Second) -// cancel() -// gc.wg.Wait() -// }) -// -// t.Run("parse failed", func(t *testing.T) { -// ctx, cancel := context.WithCancel(context.Background()) -// gc := &garbageCollector{ -// ctx: ctx, -// cancel: cancel, -// wg: sync.WaitGroup{}, -// gcFileDuration: time.Millisecond * 300, -// gcMetaDuration: time.Millisecond * 300, -// metaTable: constructMetaTable(&catalog.Catalog{ -// Txn: &mockETCDKV{ -// remove: func(s string) error { -// return fmt.Errorf("error") -// }, -// }, -// }), -// chunkManager: &chunkManagerMock{ -// removeWithPrefix: func(s string) error { -// return nil -// }, -// listWithPrefix: func(s string, recursive bool) ([]string, []time.Time, error) { -// if !recursive { -// return []string{"a/b/c/"}, []time.Time{{}}, nil -// } -// return []string{"a/b/1/c", "a/b/2/d"}, []time.Time{{}, {}}, nil -// }, -// remove: func(s string) error { -// return nil -// }, -// }, -// } -// -// gc.wg.Add(1) -// go gc.recycleUnusedIndexFiles() -// time.Sleep(time.Second) -// cancel() -// gc.wg.Wait() -// }) -// -// t.Run("ListWithPrefix failed", func(t *testing.T) { -// ctx, cancel := context.WithCancel(context.Background()) -// gc := &garbageCollector{ -// ctx: ctx, -// cancel: cancel, -// wg: sync.WaitGroup{}, -// gcFileDuration: time.Millisecond * 300, -// gcMetaDuration: time.Millisecond * 300, -// metaTable: constructMetaTable(&catalog.Catalog{ -// Txn: &mockETCDKV{ -// remove: func(s string) error { -// return fmt.Errorf("error") -// }, -// }, -// }), -// chunkManager: &chunkManagerMock{ -// removeWithPrefix: func(s string) error { -// return nil -// }, -// listWithPrefix: func(s string, recursive bool) ([]string, []time.Time, error) { -// if !recursive { -// return []string{"a/b/1/"}, []time.Time{{}}, nil -// } -// return nil, nil, fmt.Errorf("error") -// }, -// remove: func(s string) error { -// return nil -// }, -// }, -// } -// -// gc.wg.Add(1) -// go gc.recycleUnusedIndexFiles() -// time.Sleep(time.Second) -// cancel() -// gc.wg.Wait() -// }) -// -// t.Run("remove failed", func(t *testing.T) { -// ctx, cancel := context.WithCancel(context.Background()) -// gc := &garbageCollector{ -// ctx: ctx, -// cancel: cancel, -// wg: sync.WaitGroup{}, -// gcFileDuration: time.Millisecond * 300, -// gcMetaDuration: time.Millisecond * 300, -// metaTable: constructMetaTable(&catalog.Catalog{ -// Txn: &mockETCDKV{ -// remove: func(s string) error { -// return fmt.Errorf("error") -// }, -// }, -// }), -// chunkManager: &chunkManagerMock{ -// removeWithPrefix: func(s string) error { -// return nil -// }, -// listWithPrefix: func(s string, recursive bool) ([]string, []time.Time, error) { -// if !recursive { -// return []string{"a/b/1/"}, []time.Time{{}}, nil -// } -// return []string{"a/b/1/c"}, []time.Time{{}}, nil -// }, -// remove: func(s string) error { -// return fmt.Errorf("error") -// }, -// }, -// } -// -// gc.wg.Add(1) -// go gc.recycleUnusedIndexFiles() -// time.Sleep(time.Second) -// cancel() -// gc.wg.Wait() -// }) -// -// t.Run("meta mark deleted", func(t *testing.T) { -// ctx, cancel := context.WithCancel(context.Background()) -// gc := &garbageCollector{ -// ctx: ctx, -// cancel: cancel, -// wg: sync.WaitGroup{}, -// gcFileDuration: time.Millisecond * 300, -// gcMetaDuration: time.Millisecond * 300, -// metaTable: constructMetaTable(&catalog.Catalog{ -// Txn: &mockETCDKV{ -// remove: func(s string) error { -// return fmt.Errorf("error") -// }, -// }, -// }), -// chunkManager: &chunkManagerMock{ -// removeWithPrefix: func(s string) error { -// return nil -// }, -// listWithPrefix: func(s string, recursive bool) ([]string, []time.Time, error) { -// if !recursive { -// return []string{"a/b/1/"}, []time.Time{{}}, nil -// } -// return []string{"a/b/1/c"}, []time.Time{{}}, nil -// }, -// remove: func(s string) error { -// return fmt.Errorf("error") -// }, -// }, -// } -// -// gc.wg.Add(1) -// go gc.recycleUnusedIndexFiles() -// time.Sleep(time.Second) -// cancel() -// gc.wg.Wait() -// }) -//} -// -//func TestIndexCoord_recycleUnusedMetaLoop(t *testing.T) { -// t.Run("success", func(t *testing.T) { -// ctx, cancel := context.WithCancel(context.Background()) -// gc := &garbageCollector{ -// ctx: ctx, -// cancel: cancel, -// wg: sync.WaitGroup{}, -// gcFileDuration: time.Millisecond * 300, -// gcMetaDuration: time.Millisecond * 300, -// metaTable: constructMetaTable(&catalog.Catalog{ -// Txn: &mockETCDKV{ -// remove: func(s string) error { -// return fmt.Errorf("error") -// }, -// }, -// }), -// } -// gc.wg.Add(1) -// go gc.recycleUnusedSegIndexes() -// time.Sleep(time.Second) -// cancel() -// gc.wg.Wait() -// }) -// -// t.Run("remove meta failed", func(t *testing.T) { -// ctx, cancel := context.WithCancel(context.Background()) -// gc := &garbageCollector{ -// ctx: ctx, -// cancel: cancel, -// wg: sync.WaitGroup{}, -// gcFileDuration: time.Millisecond * 300, -// gcMetaDuration: time.Millisecond * 300, -// metaTable: &metaTable{ -// indexBuildID2Meta: map[UniqueID]*Meta{ -// 1: { -// indexMeta: &indexpb.IndexMeta{ -// IndexBuildID: 1, -// MarkDeleted: true, -// NodeID: 0, -// }, -// }, -// }, -// client: &mockETCDKV{ -// remove: func(s string) error { -// return fmt.Errorf("error") -// }, -// }, -// }, -// } -// gc.wg.Add(1) -// go gc.recycleUnusedMeta() -// time.Sleep(time.Second) -// cancel() -// gc.wg.Wait() -// }) -//} +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/internal/metastore/kv/indexcoord" + "github.com/milvus-io/milvus/internal/metastore/model" +) + +func TestGarbageCollector_Start(t *testing.T) { + gc := newGarbageCollector(context.Background(), &metaTable{}, &chunkManagerMock{ + removeWithPrefix: func(s string) error { + return nil + }, + listWithPrefix: func(s string, recursive bool) ([]string, []time.Time, error) { + return []string{}, []time.Time{}, nil + }, + remove: func(s string) error { + return nil + }, + }, &IndexCoord{}) + + gc.gcMetaDuration = time.Millisecond * 300 + gc.gcFileDuration = time.Millisecond * 300 + + gc.Start() + gc.Stop() +} + +func TestGarbageCollector_recycleUnusedIndexFiles(t *testing.T) { + t.Run("index not in meta and remove with prefix failed", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + gc := &garbageCollector{ + ctx: ctx, + cancel: cancel, + wg: sync.WaitGroup{}, + gcFileDuration: time.Millisecond * 300, + gcMetaDuration: time.Millisecond * 300, + metaTable: constructMetaTable(&indexcoord.Catalog{ + Txn: &mockETCDKV{ + remove: func(s string) error { + return fmt.Errorf("error") + }, + }, + }), + chunkManager: &chunkManagerMock{ + removeWithPrefix: func(s string) error { + return fmt.Errorf("error") + }, + listWithPrefix: func(s string, recursive bool) ([]string, []time.Time, error) { + if !recursive { + return []string{"a/b/1/", "a/b/2/"}, []time.Time{{}, {}}, nil + } + return []string{"a/b/1/c", "a/b/2/d"}, []time.Time{{}, {}}, nil + }, + remove: func(s string) error { + return nil + }, + }, + } + + gc.wg.Add(1) + go gc.recycleUnusedIndexFiles() + time.Sleep(time.Second) + cancel() + gc.wg.Wait() + }) + + t.Run("load dir failed", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + gc := &garbageCollector{ + ctx: ctx, + cancel: cancel, + wg: sync.WaitGroup{}, + gcFileDuration: time.Millisecond * 300, + gcMetaDuration: time.Millisecond * 300, + metaTable: constructMetaTable(&indexcoord.Catalog{ + Txn: &mockETCDKV{ + remove: func(s string) error { + return fmt.Errorf("error") + }, + }, + }), + chunkManager: &chunkManagerMock{ + removeWithPrefix: func(s string) error { + return nil + }, + listWithPrefix: func(s string, recursive bool) ([]string, []time.Time, error) { + if !recursive { + return nil, nil, fmt.Errorf("error") + } + return []string{"a/b/1/c", "a/b/2/d"}, []time.Time{{}, {}}, nil + }, + remove: func(s string) error { + return nil + }, + }, + } + + gc.wg.Add(1) + go gc.recycleUnusedIndexFiles() + time.Sleep(time.Second) + cancel() + gc.wg.Wait() + }) + + t.Run("parse failed", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + gc := &garbageCollector{ + ctx: ctx, + cancel: cancel, + wg: sync.WaitGroup{}, + gcFileDuration: time.Millisecond * 300, + gcMetaDuration: time.Millisecond * 300, + metaTable: constructMetaTable(&indexcoord.Catalog{ + Txn: &mockETCDKV{ + remove: func(s string) error { + return fmt.Errorf("error") + }, + }, + }), + chunkManager: &chunkManagerMock{ + removeWithPrefix: func(s string) error { + return nil + }, + listWithPrefix: func(s string, recursive bool) ([]string, []time.Time, error) { + if !recursive { + return []string{"a/b/c/"}, []time.Time{{}}, nil + } + return []string{"a/b/1/c", "a/b/2/d"}, []time.Time{{}, {}}, nil + }, + remove: func(s string) error { + return nil + }, + }, + } + + gc.wg.Add(1) + go gc.recycleUnusedIndexFiles() + time.Sleep(time.Second) + cancel() + gc.wg.Wait() + }) + + t.Run("ListWithPrefix failed", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + gc := &garbageCollector{ + ctx: ctx, + cancel: cancel, + wg: sync.WaitGroup{}, + gcFileDuration: time.Millisecond * 300, + gcMetaDuration: time.Millisecond * 300, + metaTable: constructMetaTable(&indexcoord.Catalog{ + Txn: &mockETCDKV{ + remove: func(s string) error { + return fmt.Errorf("error") + }, + }, + }), + chunkManager: &chunkManagerMock{ + removeWithPrefix: func(s string) error { + return nil + }, + listWithPrefix: func(s string, recursive bool) ([]string, []time.Time, error) { + if !recursive { + return []string{"a/b/1/"}, []time.Time{{}}, nil + } + return nil, nil, fmt.Errorf("error") + }, + remove: func(s string) error { + return nil + }, + }, + } + + gc.wg.Add(1) + go gc.recycleUnusedIndexFiles() + time.Sleep(time.Second) + cancel() + gc.wg.Wait() + }) + + t.Run("remove failed", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + gc := &garbageCollector{ + ctx: ctx, + cancel: cancel, + wg: sync.WaitGroup{}, + gcFileDuration: time.Millisecond * 300, + gcMetaDuration: time.Millisecond * 300, + metaTable: constructMetaTable(&indexcoord.Catalog{ + Txn: &mockETCDKV{ + remove: func(s string) error { + return fmt.Errorf("error") + }, + }, + }), + chunkManager: &chunkManagerMock{ + removeWithPrefix: func(s string) error { + return nil + }, + listWithPrefix: func(s string, recursive bool) ([]string, []time.Time, error) { + if !recursive { + return []string{"a/b/1/"}, []time.Time{{}}, nil + } + return []string{"a/b/1/c"}, []time.Time{{}}, nil + }, + remove: func(s string) error { + return fmt.Errorf("error") + }, + }, + } + + gc.wg.Add(1) + go gc.recycleUnusedIndexFiles() + time.Sleep(time.Second) + cancel() + gc.wg.Wait() + }) + + t.Run("meta mark deleted", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + gc := &garbageCollector{ + ctx: ctx, + cancel: cancel, + wg: sync.WaitGroup{}, + gcFileDuration: time.Millisecond * 300, + gcMetaDuration: time.Millisecond * 300, + metaTable: constructMetaTable(&indexcoord.Catalog{ + Txn: &mockETCDKV{ + remove: func(s string) error { + return fmt.Errorf("error") + }, + }, + }), + chunkManager: &chunkManagerMock{ + removeWithPrefix: func(s string) error { + return nil + }, + listWithPrefix: func(s string, recursive bool) ([]string, []time.Time, error) { + if !recursive { + return []string{"a/b/1/"}, []time.Time{{}}, nil + } + return []string{"a/b/1/c"}, []time.Time{{}}, nil + }, + remove: func(s string) error { + return fmt.Errorf("error") + }, + }, + } + + gc.wg.Add(1) + go gc.recycleUnusedIndexFiles() + time.Sleep(time.Second) + cancel() + gc.wg.Wait() + }) +} + +func TestIndexCoord_recycleUnusedMetaLoop(t *testing.T) { + t.Run("success", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + gc := &garbageCollector{ + ctx: ctx, + cancel: cancel, + wg: sync.WaitGroup{}, + gcFileDuration: time.Millisecond * 300, + gcMetaDuration: time.Millisecond * 300, + metaTable: constructMetaTable(&indexcoord.Catalog{ + Txn: &mockETCDKV{ + remove: func(s string) error { + return fmt.Errorf("error") + }, + multiSave: func(m map[string]string) error { + return nil + }, + }, + }), + indexCoordClient: &IndexCoord{ + dataCoordClient: NewDataCoordMock(), + }, + } + gc.wg.Add(1) + go gc.recycleUnusedSegIndexes() + time.Sleep(time.Second) + cancel() + gc.wg.Wait() + }) + + t.Run("remove meta failed", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + gc := &garbageCollector{ + ctx: ctx, + cancel: cancel, + wg: sync.WaitGroup{}, + gcFileDuration: time.Millisecond * 300, + gcMetaDuration: time.Millisecond * 300, + metaTable: &metaTable{ + buildID2SegmentIndex: map[UniqueID]*model.SegmentIndex{ + 1: { + SegmentID: 0, + CollectionID: 0, + PartitionID: 0, + NumRows: 0, + IndexID: 0, + BuildID: 0, + NodeID: 0, + IndexVersion: 0, + IndexState: 0, + FailReason: "", + IsDeleted: true, + CreateTime: 0, + IndexFilePaths: nil, + IndexSize: 0, + }, + }, + catalog: &indexcoord.Catalog{ + Txn: &mockETCDKV{ + remove: func(s string) error { + return fmt.Errorf("error") + }, + }, + }, + }, + indexCoordClient: &IndexCoord{ + dataCoordClient: NewDataCoordMock(), + }, + } + gc.wg.Add(1) + go gc.recycleUnusedSegIndexes() + time.Sleep(time.Second) + cancel() + gc.wg.Wait() + }) +} + +func TestGarbageCollector_Recycle(t *testing.T) { + ctx, canel := context.WithCancel(context.Background()) + gc := &garbageCollector{ + ctx: ctx, + cancel: canel, + wg: sync.WaitGroup{}, + gcFileDuration: 300 * time.Millisecond, + gcMetaDuration: 300 * time.Millisecond, + metaTable: createMetaTable(&indexcoord.Catalog{Txn: NewMockEtcdKV()}), + chunkManager: &chunkManagerMock{ + listWithPrefix: func(s string, b bool) ([]string, []time.Time, error) { + return nil, nil, nil + }, + }, + indexCoordClient: &IndexCoord{ + dataCoordClient: NewDataCoordMock(), + }, + } + + gc.Start() + time.Sleep(time.Second) + err := gc.metaTable.MarkIndexAsDeleted(collID, []int64{indexID}) + assert.NoError(t, err) + time.Sleep(time.Second * 10) + gc.Stop() +} diff --git a/internal/indexcoord/index_builder.go b/internal/indexcoord/index_builder.go index d7172085f9277..4f94d0c9b4049 100644 --- a/internal/indexcoord/index_builder.go +++ b/internal/indexcoord/index_builder.go @@ -59,7 +59,7 @@ func newIndexBuilder(ctx context.Context, ic *IndexCoord, metaTable *metaTable, ic: ic, tasks: make(map[int64]indexTaskState), notifyChan: make(chan struct{}, 1), - scheduleDuration: time.Second * 3, + scheduleDuration: time.Second, } ib.reloadFromKV(aliveNodes) return ib @@ -129,7 +129,9 @@ func (ib *indexBuilder) enqueue(buildID UniqueID) { ib.taskMutex.Lock() defer ib.taskMutex.Unlock() - ib.tasks[buildID] = indexTaskInit + if _, ok := ib.tasks[buildID]; !ok { + ib.tasks[buildID] = indexTaskInit + } } func (ib *indexBuilder) schedule() { @@ -157,7 +159,6 @@ func (ib *indexBuilder) schedule() { func (ib *indexBuilder) run() { ib.taskMutex.RLock() - log.Info("index builder task schedule", zap.Int("task num", len(ib.tasks))) buildIDs := make([]UniqueID, 0, len(ib.tasks)) for tID := range ib.tasks { buildIDs = append(buildIDs, tID) @@ -167,13 +168,15 @@ func (ib *indexBuilder) run() { sort.Slice(buildIDs, func(i, j int) bool { return buildIDs[i] < buildIDs[j] }) + if len(buildIDs) > 0 { + log.Info("index builder task schedule", zap.Int("task num", len(buildIDs))) + } for _, buildID := range buildIDs { ib.process(buildID) } } func (ib *indexBuilder) process(buildID UniqueID) { - defer ib.notify() ib.taskMutex.RLock() state := ib.tasks[buildID] ib.taskMutex.RUnlock() @@ -254,6 +257,7 @@ func (ib *indexBuilder) process(buildID UniqueID) { if segmentsInfo.Status.ErrorCode != commonpb.ErrorCode_Success { log.Error("IndexCoord get segment info from DataCoord fail", zap.Int64("segID", meta.SegmentID), zap.Int64("buildID", buildID), zap.String("failReason", segmentsInfo.Status.Reason)) + // TODO: delete after QueryCoordV2 if segmentsInfo.Status.GetReason() == msgSegmentNotFound(meta.SegmentID) { updateStateFunc(buildID, indexTaskDeleted) return @@ -362,8 +366,11 @@ func (ib *indexBuilder) process(buildID UniqueID) { case indexTaskDeleted: log.Debug("index task state is deleted, try to release reference lock", zap.Int64("buildID", buildID)) - - if exist && meta.NodeID != 0 { + // TODO: delete after QueryCoordV2 + if err := ib.meta.MarkSegmentsIndexAsDeletedByBuildID([]int64{buildID}); err != nil { + return + } + if meta.NodeID != 0 { if !ib.dropIndexTask(buildID, meta.NodeID) { return } @@ -492,7 +499,7 @@ func (ib *indexBuilder) releaseLockAndResetTask(buildID UniqueID, nodeID UniqueI return nil } -func (ib *indexBuilder) markTasksAsDeleted(buildIDs []UniqueID) { +func (ib *indexBuilder) markTasksAsDeleted(buildIDs []UniqueID) error { defer ib.notify() ib.taskMutex.Lock() @@ -504,6 +511,7 @@ func (ib *indexBuilder) markTasksAsDeleted(buildIDs []UniqueID) { log.Debug("index task has been deleted", zap.Int64("buildID", buildID)) } } + return nil } func (ib *indexBuilder) nodeDown(nodeID UniqueID) { diff --git a/internal/indexcoord/index_coord.go b/internal/indexcoord/index_coord.go index 9322a369f44a1..10eaf2a3c1e13 100644 --- a/internal/indexcoord/index_coord.go +++ b/internal/indexcoord/index_coord.go @@ -28,14 +28,13 @@ import ( "syscall" "time" - "github.com/milvus-io/milvus/internal/kv" - "go.etcd.io/etcd/api/v3/mvccpb" v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" "github.com/milvus-io/milvus/internal/common" + "github.com/milvus-io/milvus/internal/kv" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/metastore/model" @@ -437,7 +436,6 @@ func (i *IndexCoord) CreateIndex(ctx context.Context, req *indexpb.CreateIndexRe return ret, nil } - i.garbageCollector.Notify() ret.ErrorCode = commonpb.ErrorCode_Success return ret, nil } @@ -515,7 +513,7 @@ func (i *IndexCoord) GetSegmentIndexState(ctx context.Context, req *indexpb.GetS States: make([]*indexpb.SegmentIndexState, 0), } indexID2CreateTs := i.metaTable.GetIndexIDByName(req.CollectionID, req.IndexName) - if len(indexID2CreateTs) != 1 { + if len(indexID2CreateTs) == 0 { errMsg := fmt.Sprintf("there is no index on collection: %d with the index name: %s", req.CollectionID, req.IndexName) log.Error("IndexCoord get index state fail", zap.Int64("collectionID", req.CollectionID), zap.String("indexName", req.IndexName), zap.String("fail reason", errMsg)) @@ -526,15 +524,13 @@ func (i *IndexCoord) GetSegmentIndexState(ctx context.Context, req *indexpb.GetS }, }, nil } - for indexID := range indexID2CreateTs { - for _, segID := range req.SegmentIDs { - state := i.metaTable.GetSegmentIndexState(segID, indexID) - ret.States = append(ret.States, &indexpb.SegmentIndexState{ - SegmentID: segID, - State: state.state, - FailReason: state.failReason, - }) - } + for _, segID := range req.SegmentIDs { + state := i.metaTable.GetSegmentIndexState(segID) + ret.States = append(ret.States, &indexpb.SegmentIndexState{ + SegmentID: segID, + State: state.state, + FailReason: state.failReason, + }) } return ret, nil } @@ -698,7 +694,7 @@ func (i *IndexCoord) GetIndexInfos(ctx context.Context, req *indexpb.GetIndexInf } log.Info("IndexCoord GetIndexFilePaths ", zap.Int("segIDs num", len(req.SegmentIDs)), - zap.Int("file path num", len(ret.SegmentInfo)), zap.Any("ret ", ret.SegmentInfo)) + zap.Int("file path num", len(ret.SegmentInfo))) return ret, nil } diff --git a/internal/indexcoord/meta_table.go b/internal/indexcoord/meta_table.go index 2560db3bcedbb..2351129578919 100644 --- a/internal/indexcoord/meta_table.go +++ b/internal/indexcoord/meta_table.go @@ -192,7 +192,7 @@ func (mt *metaTable) GetMeta(buildID UniqueID) (*model.SegmentIndex, bool) { defer mt.segmentIndexLock.RUnlock() segIdx, ok := mt.buildID2SegmentIndex[buildID] - if ok { + if ok && !segIdx.IsDeleted { return model.CloneSegmentIndex(segIdx), true } @@ -621,29 +621,36 @@ func (mt *metaTable) GetSegmentIndexes(segID UniqueID) []*model.SegmentIndex { segIndexInfos := make([]*model.SegmentIndex, 0) if segIndexes, ok := mt.segmentIndexes[segID]; ok { for _, segIdx := range segIndexes { + if segIdx.IsDeleted { + continue + } segIndexInfos = append(segIndexInfos, model.CloneSegmentIndex(segIdx)) } } return segIndexInfos } -func (mt *metaTable) GetSegmentIndexState(segmentID UniqueID, indexID UniqueID) IndexState { +func (mt *metaTable) GetSegmentIndexState(segmentID UniqueID) IndexState { mt.segmentIndexLock.RLock() defer mt.segmentIndexLock.RUnlock() + state := IndexState{ + state: commonpb.IndexState_Finished, + failReason: "", + } if segIdxes, ok := mt.segmentIndexes[segmentID]; ok { - if segIdx, ok := segIdxes[indexID]; ok && !segIdx.IsDeleted { - return IndexState{ - state: segIdx.IndexState, - failReason: segIdx.FailReason, + for _, segIdx := range segIdxes { + if segIdx.IsDeleted { + continue + } + if segIdx.IndexState != commonpb.IndexState_Finished { + state.state = segIdx.IndexState + state.failReason = segIdx.FailReason } } } - return IndexState{ - state: commonpb.IndexState_IndexStateNone, - failReason: "there is no index", - } + return state } // GetIndexBuildProgress gets the index progress for indexID from meta table. @@ -861,6 +868,19 @@ func (mt *metaTable) GetDeletedIndexes() []*model.Index { return indexes } +func (mt *metaTable) GetDeletedSegmentIndexes() []*model.SegmentIndex { + mt.segmentIndexLock.RLock() + defer mt.segmentIndexLock.RUnlock() + + segIndexes := make([]*model.SegmentIndex, 0) + for _, segIdx := range mt.buildID2SegmentIndex { + if segIdx.IsDeleted { + segIndexes = append(segIndexes, segIdx) + } + } + return segIndexes +} + func (mt *metaTable) GetBuildIDsFromIndexID(indexID UniqueID) []UniqueID { mt.segmentIndexLock.RLock() defer mt.segmentIndexLock.RUnlock() @@ -983,3 +1003,32 @@ func (mt *metaTable) FinishTask(taskInfo *indexpb.IndexTaskInfo) error { return mt.updateSegIndexMeta(segIdx, updateFunc) } + +func (mt *metaTable) MarkSegmentsIndexAsDeletedByBuildID(buildIDs []UniqueID) error { + mt.segmentIndexLock.Lock() + defer mt.segmentIndexLock.Unlock() + + segIdxes := make([]*model.SegmentIndex, 0) + for _, buildID := range buildIDs { + if segIdx, ok := mt.buildID2SegmentIndex[buildID]; ok { + if segIdx.IsDeleted { + continue + } + clonedSegIdx := model.CloneSegmentIndex(segIdx) + clonedSegIdx.IsDeleted = true + segIdxes = append(segIdxes, clonedSegIdx) + } + } + if len(segIdxes) == 0 { + log.Debug("IndexCoord metaTable MarkSegmentsIndexAsDeletedByBuildID success, already have deleted", + zap.Int64s("buildIDs", buildIDs)) + return nil + } + err := mt.alterSegmentIndexes(segIdxes) + if err != nil { + log.Error("IndexCoord metaTable MarkSegmentsIndexAsDeletedByBuildID fail", zap.Int64s("buildIDs", buildIDs), zap.Error(err)) + return err + } + log.Info("IndexCoord metaTable MarkSegmentsIndexAsDeletedByBuildID success", zap.Int64s("buildIDs", buildIDs)) + return nil +} diff --git a/internal/indexcoord/meta_table_test.go b/internal/indexcoord/meta_table_test.go index 565a9863b767b..dfed9f8a329c1 100644 --- a/internal/indexcoord/meta_table_test.go +++ b/internal/indexcoord/meta_table_test.go @@ -634,11 +634,11 @@ func TestMetaTable_GetSegmentIndexes(t *testing.T) { func TestMetaTable_GetSegmentIndexState(t *testing.T) { mt := constructMetaTable(&indexcoord.Catalog{}) - state := mt.GetSegmentIndexState(segID, indexID) + state := mt.GetSegmentIndexState(segID) assert.Equal(t, commonpb.IndexState_Finished, state.state) - state = mt.GetSegmentIndexState(segID+1, indexID) - assert.Equal(t, commonpb.IndexState_IndexStateNone, state.state) + state = mt.GetSegmentIndexState(segID + 1) + assert.Equal(t, commonpb.IndexState_Finished, state.state) } func TestMetaTable_GetIndexBuildProgress(t *testing.T) { @@ -1140,3 +1140,39 @@ func TestMetaTable_FinishTask(t *testing.T) { assert.Error(t, err) }) } + +func TestMetaTable_MarkSegmentsIndexAsDeletedByBuildID(t *testing.T) { + t.Run("success", func(t *testing.T) { + mt := constructMetaTable(&indexcoord.Catalog{ + Txn: &mockETCDKV{ + multiSave: func(m map[string]string) error { + return nil + }, + }, + }) + err := mt.MarkSegmentsIndexAsDeletedByBuildID([]UniqueID{buildID}) + assert.NoError(t, err) + + err = mt.MarkSegmentsIndexAsDeletedByBuildID([]UniqueID{buildID}) + assert.NoError(t, err) + }) + + t.Run("fail", func(t *testing.T) { + mt := constructMetaTable(&indexcoord.Catalog{ + Txn: &mockETCDKV{ + multiSave: func(m map[string]string) error { + return errors.New("error") + }, + }, + }) + err := mt.MarkSegmentsIndexAsDeletedByBuildID([]UniqueID{buildID}) + assert.Error(t, err) + }) +} + +func TestMetaTable_GetDeletedSegmentIndexes(t *testing.T) { + mt := createMetaTable(&indexcoord.Catalog{}) + + segIndexes := mt.GetDeletedSegmentIndexes() + assert.Equal(t, 1, len(segIndexes)) +} diff --git a/internal/indexcoord/node_manager_test.go b/internal/indexcoord/node_manager_test.go index ce91c1e4ecbf1..18f7feee5417a 100644 --- a/internal/indexcoord/node_manager_test.go +++ b/internal/indexcoord/node_manager_test.go @@ -16,134 +16,130 @@ package indexcoord -//func TestNodeManager_PeekClient(t *testing.T) { -// t.Run("success", func(t *testing.T) { -// nm := NewNodeManager(context.Background()) -// meta := &Meta{ -// indexMeta: &indexpb.IndexMeta{ -// Req: &indexpb.BuildIndexRequest{ -// DataPaths: []string{"PeekClient-1", "PeekClient-2"}, -// NumRows: 1000, -// TypeParams: []*commonpb.KeyValuePair{ -// { -// Key: "dim", -// Value: "128", -// }, -// }, -// FieldSchema: &schemapb.FieldSchema{ -// DataType: schemapb.DataType_FloatVector, -// }, -// }, -// }, -// } -// nodeID, client := nm.PeekClient(meta) -// assert.Equal(t, int64(-1), nodeID) -// assert.Nil(t, client) -// err := nm.AddNode(1, "indexnode-1") -// assert.Nil(t, err) -// nm.pq.SetMemory(1, 100) -// nodeID2, client2 := nm.PeekClient(meta) -// assert.Equal(t, int64(0), nodeID2) -// assert.Nil(t, client2) -// }) -// -// t.Run("multiple unavailable IndexNode", func(t *testing.T) { -// nm := &NodeManager{ -// ctx: context.TODO(), -// nodeClients: map[UniqueID]types.IndexNode{ -// 1: &indexnode.MockIndexNode{ -// GetTaskSlotsMock: func(ctx context.Context, req *indexpb.GetTaskSlotsRequest) (*indexpb.GetTaskSlotsResponse, error) { -// return &indexpb.GetTaskSlotsResponse{ -// Status: &commonpb.Status{ -// ErrorCode: commonpb.ErrorCode_UnexpectedError, -// }, -// }, errors.New("error") -// }, -// }, -// 2: &indexnode.MockIndexNode{ -// GetTaskSlotsMock: func(ctx context.Context, req *indexpb.GetTaskSlotsRequest) (*indexpb.GetTaskSlotsResponse, error) { -// return &indexpb.GetTaskSlotsResponse{ -// Status: &commonpb.Status{ -// ErrorCode: commonpb.ErrorCode_UnexpectedError, -// }, -// }, errors.New("error") -// }, -// }, -// 3: &indexnode.MockIndexNode{ -// GetTaskSlotsMock: func(ctx context.Context, req *indexpb.GetTaskSlotsRequest) (*indexpb.GetTaskSlotsResponse, error) { -// return &indexpb.GetTaskSlotsResponse{ -// Status: &commonpb.Status{ -// ErrorCode: commonpb.ErrorCode_UnexpectedError, -// }, -// }, errors.New("error") -// }, -// }, -// 4: &indexnode.MockIndexNode{ -// GetTaskSlotsMock: func(ctx context.Context, req *indexpb.GetTaskSlotsRequest) (*indexpb.GetTaskSlotsResponse, error) { -// return &indexpb.GetTaskSlotsResponse{ -// Status: &commonpb.Status{ -// ErrorCode: commonpb.ErrorCode_UnexpectedError, -// }, -// }, errors.New("error") -// }, -// }, -// 5: &indexnode.MockIndexNode{ -// GetTaskSlotsMock: func(ctx context.Context, req *indexpb.GetTaskSlotsRequest) (*indexpb.GetTaskSlotsResponse, error) { -// return &indexpb.GetTaskSlotsResponse{ -// Status: &commonpb.Status{ -// ErrorCode: commonpb.ErrorCode_UnexpectedError, -// Reason: "fail reason", -// }, -// }, nil -// }, -// }, -// 6: &indexnode.MockIndexNode{ -// GetTaskSlotsMock: func(ctx context.Context, req *indexpb.GetTaskSlotsRequest) (*indexpb.GetTaskSlotsResponse, error) { -// return &indexpb.GetTaskSlotsResponse{ -// Status: &commonpb.Status{ -// ErrorCode: commonpb.ErrorCode_UnexpectedError, -// Reason: "fail reason", -// }, -// }, nil -// }, -// }, -// 7: &indexnode.MockIndexNode{ -// GetTaskSlotsMock: func(ctx context.Context, req *indexpb.GetTaskSlotsRequest) (*indexpb.GetTaskSlotsResponse, error) { -// return &indexpb.GetTaskSlotsResponse{ -// Status: &commonpb.Status{ -// ErrorCode: commonpb.ErrorCode_UnexpectedError, -// Reason: "fail reason", -// }, -// }, nil -// }, -// }, -// 8: &indexnode.MockIndexNode{ -// GetTaskSlotsMock: func(ctx context.Context, req *indexpb.GetTaskSlotsRequest) (*indexpb.GetTaskSlotsResponse, error) { -// return &indexpb.GetTaskSlotsResponse{ -// Slots: 1, -// Status: &commonpb.Status{ -// ErrorCode: commonpb.ErrorCode_Success, -// Reason: "", -// }, -// }, nil -// }, -// }, -// 9: &indexnode.MockIndexNode{ -// GetTaskSlotsMock: func(ctx context.Context, req *indexpb.GetTaskSlotsRequest) (*indexpb.GetTaskSlotsResponse, error) { -// return &indexpb.GetTaskSlotsResponse{ -// Slots: 10, -// Status: &commonpb.Status{ -// ErrorCode: commonpb.ErrorCode_Success, -// Reason: "", -// }, -// }, nil -// }, -// }, -// }, -// } -// -// nodeID, client := nm.PeekClient(&Meta{}) -// assert.NotNil(t, client) -// assert.Contains(t, []UniqueID{8, 9}, nodeID) -// }) -//} +import ( + "context" + "errors" + "testing" + + "github.com/milvus-io/milvus/internal/indexnode" + "github.com/milvus-io/milvus/internal/metastore/model" + "github.com/milvus-io/milvus/internal/proto/commonpb" + "github.com/milvus-io/milvus/internal/proto/indexpb" + "github.com/milvus-io/milvus/internal/types" + "github.com/stretchr/testify/assert" +) + +func TestNodeManager_PeekClient(t *testing.T) { + t.Run("success", func(t *testing.T) { + nm := NewNodeManager(context.Background()) + nodeID, client := nm.PeekClient(&model.SegmentIndex{}) + assert.Equal(t, int64(-1), nodeID) + assert.Nil(t, client) + err := nm.AddNode(1, "indexnode-1") + assert.Nil(t, err) + nm.pq.SetMemory(1, 100) + nodeID2, client2 := nm.PeekClient(&model.SegmentIndex{}) + assert.Equal(t, int64(0), nodeID2) + assert.Nil(t, client2) + }) + + t.Run("multiple unavailable IndexNode", func(t *testing.T) { + nm := &NodeManager{ + ctx: context.TODO(), + nodeClients: map[UniqueID]types.IndexNode{ + 1: &indexnode.Mock{ + CallGetJobStats: func(ctx context.Context, req *indexpb.GetJobStatsRequest) (*indexpb.GetJobStatsResponse, error) { + return &indexpb.GetJobStatsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + }, + }, errors.New("error") + }, + }, + 2: &indexnode.Mock{ + CallGetJobStats: func(ctx context.Context, req *indexpb.GetJobStatsRequest) (*indexpb.GetJobStatsResponse, error) { + return &indexpb.GetJobStatsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + }, + }, errors.New("error") + }, + }, + 3: &indexnode.Mock{ + CallGetJobStats: func(ctx context.Context, req *indexpb.GetJobStatsRequest) (*indexpb.GetJobStatsResponse, error) { + return &indexpb.GetJobStatsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + }, + }, errors.New("error") + }, + }, + 4: &indexnode.Mock{ + CallGetJobStats: func(ctx context.Context, req *indexpb.GetJobStatsRequest) (*indexpb.GetJobStatsResponse, error) { + return &indexpb.GetJobStatsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + }, + }, errors.New("error") + }, + }, + 5: &indexnode.Mock{ + CallGetJobStats: func(ctx context.Context, req *indexpb.GetJobStatsRequest) (*indexpb.GetJobStatsResponse, error) { + return &indexpb.GetJobStatsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: "fail reason", + }, + }, nil + }, + }, + 6: &indexnode.Mock{ + CallGetJobStats: func(ctx context.Context, req *indexpb.GetJobStatsRequest) (*indexpb.GetJobStatsResponse, error) { + return &indexpb.GetJobStatsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: "fail reason", + }, + }, nil + }, + }, + 7: &indexnode.Mock{ + CallGetJobStats: func(ctx context.Context, req *indexpb.GetJobStatsRequest) (*indexpb.GetJobStatsResponse, error) { + return &indexpb.GetJobStatsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: "fail reason", + }, + }, nil + }, + }, + 8: &indexnode.Mock{ + CallGetJobStats: func(ctx context.Context, req *indexpb.GetJobStatsRequest) (*indexpb.GetJobStatsResponse, error) { + return &indexpb.GetJobStatsResponse{ + TaskSlots: 1, + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + Reason: "", + }, + }, nil + }, + }, + 9: &indexnode.Mock{ + CallGetJobStats: func(ctx context.Context, req *indexpb.GetJobStatsRequest) (*indexpb.GetJobStatsResponse, error) { + return &indexpb.GetJobStatsResponse{ + TaskSlots: 10, + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + Reason: "", + }, + }, nil + }, + }, + }, + } + + nodeID, client := nm.PeekClient(&model.SegmentIndex{}) + assert.NotNil(t, client) + assert.Contains(t, []UniqueID{8, 9}, nodeID) + }) +} diff --git a/internal/indexcoord/task_state.go b/internal/indexcoord/task_state.go index 3aabed909da0a..8dcbc1a2e7574 100644 --- a/internal/indexcoord/task_state.go +++ b/internal/indexcoord/task_state.go @@ -29,8 +29,8 @@ const ( indexTaskRetry // task has been deleted. indexTaskDeleted - // task needs to recycle meta info on IndexNode - indexTaskRecycle + // task needs to prepare segment info on IndexNode + indexTaskPrepare ) var TaskStateNames = map[indexTaskState]string{ @@ -39,8 +39,7 @@ var TaskStateNames = map[indexTaskState]string{ 2: "Done", 3: "Retry", 4: "Deleted", - 5: "Recycle", - 6: "Wait", + 5: "Prepare", } func (x indexTaskState) String() string {