Skip to content

Commit

Permalink
fix: [2.5.4] Skip text index creation when segment is zero after sort…
Browse files Browse the repository at this point in the history
…ing (milvus-io#39978)

issue: milvus-io#39961 

master pr: milvus-io#39962

Signed-off-by: Cai Zhang <[email protected]>
  • Loading branch information
xiaocai2333 authored Feb 19, 2025
1 parent 9b7ebb3 commit 8009af7
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 1 deletion.
17 changes: 17 additions & 0 deletions internal/indexnode/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,3 +166,20 @@ func generateTestData(collID, partID, segID int64, num int) ([]*Blob, error) {
blobs, err := insertCodec.Serialize(partID, segID, data)
return blobs, err
}

func generateDeleteData(collID, partID, segID int64, num int) ([]*Blob, error) {
pks := make([]storage.PrimaryKey, 0, num)
tss := make([]storage.Timestamp, 0, num)
for i := 1; i <= num; i++ {
pks = append(pks, storage.NewInt64PrimaryKey(int64(i)))
tss = append(tss, storage.Timestamp(i+1))
}

deleteCodec := storage.NewDeleteCodec()
blob, err := deleteCodec.Serialize(collID, partID, segID, &storage.DeleteData{
Pks: pks,
Tss: tss,
RowCount: int64(num),
})
return []*Blob{blob}, err
}
95 changes: 94 additions & 1 deletion internal/indexnode/indexnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ type IndexNodeSuite struct {
logID int64
numRows int64
data []*Blob
deleteData []*Blob
in *IndexNode
storageConfig *indexpb.StorageConfig
cm storage.ChunkManager
Expand All @@ -205,7 +206,10 @@ func (s *IndexNodeSuite) SetupTest() {
Params.MinioCfg.RootPath.SwapTempValue("indexnode-ut")

var err error
s.data, err = generateTestData(s.collID, s.partID, s.segID, 3000)
s.data, err = generateTestData(s.collID, s.partID, s.segID, int(s.numRows))
s.NoError(err)

s.deleteData, err = generateDeleteData(s.collID, s.partID, s.segID, int(s.numRows))
s.NoError(err)

s.storageConfig = &indexpb.StorageConfig{
Expand Down Expand Up @@ -250,6 +254,13 @@ func (s *IndexNodeSuite) SetupTest() {
err = s.cm.Write(context.Background(), filePath, blob.GetValue())
s.NoError(err)
}
for i, blob := range s.deleteData {
fID, _ := strconv.ParseInt(blob.GetKey(), 10, 64)
filePath, err := binlog.BuildLogPath(storage.DeleteBinlog, s.collID, s.partID, s.segID, fID, logID+int64(i))
s.NoError(err)
err = s.cm.Write(context.Background(), filePath, blob.GetValue())
s.NoError(err)
}
}

func (s *IndexNodeSuite) TearDownSuite() {
Expand Down Expand Up @@ -688,4 +699,86 @@ func (s *IndexNodeSuite) Test_CreateStatsTask() {
err = merr.Error(status)
s.NoError(err)
})

s.Run("all deleted", func() {
deltaLogs := make([]*datapb.FieldBinlog, 0)
for i := range s.deleteData {
deltaLogs = append(deltaLogs, &datapb.FieldBinlog{
Binlogs: []*datapb.Binlog{{
EntriesNum: s.numRows,
LogSize: int64(len(s.deleteData[0].GetValue())),
MemorySize: s.deleteData[0].GetMemorySize(),
LogID: s.logID + int64(i),
TimestampFrom: 1,
TimestampTo: 3001,
}},
})
}
taskID := int64(200)
req := &workerpb.CreateStatsRequest{
ClusterID: "cluster2",
TaskID: taskID,
CollectionID: s.collID,
PartitionID: s.partID,
InsertChannel: "ch1",
SegmentID: s.segID,
InsertLogs: fieldBinlogs,
DeltaLogs: deltaLogs,
StorageConfig: s.storageConfig,
Schema: generateTestSchema(),
TargetSegmentID: s.segID + 1,
StartLogID: s.logID + 100,
EndLogID: s.logID + 200,
NumRows: s.numRows,
BinlogMaxSize: 131000,
SubJobType: indexpb.StatsSubJob_Sort,
}

status, err := s.in.CreateJobV2(ctx, &workerpb.CreateJobV2Request{
ClusterID: "cluster2",
TaskID: taskID,
JobType: indexpb.JobType_JobTypeStatsJob,
Request: &workerpb.CreateJobV2Request_StatsRequest{
StatsRequest: req,
},
})
s.NoError(err)
err = merr.Error(status)
s.NoError(err)

for {
resp, err := s.in.QueryJobsV2(ctx, &workerpb.QueryJobsV2Request{
ClusterID: "cluster2",
TaskIDs: []int64{taskID},
JobType: indexpb.JobType_JobTypeStatsJob,
})
s.NoError(err)
err = merr.Error(resp.GetStatus())
s.NoError(err)
s.Equal(1, len(resp.GetStatsJobResults().GetResults()))
if resp.GetStatsJobResults().GetResults()[0].GetState() == indexpb.JobState_JobStateFinished {
s.Zero(len(resp.GetStatsJobResults().GetResults()[0].GetInsertLogs()))
s.Equal(int64(0), resp.GetStatsJobResults().GetResults()[0].GetNumRows())
break
}
s.Equal(indexpb.JobState_JobStateInProgress, resp.GetStatsJobResults().GetResults()[0].GetState())
time.Sleep(time.Second)
}

slotResp, err := s.in.GetJobStats(ctx, &workerpb.GetJobStatsRequest{})
s.NoError(err)
err = merr.Error(slotResp.GetStatus())
s.NoError(err)

s.Equal(int64(1), slotResp.GetTaskSlots())

status, err = s.in.DropJobsV2(ctx, &workerpb.DropJobsV2Request{
ClusterID: "cluster2",
TaskIDs: []int64{taskID},
JobType: indexpb.JobType_JobTypeStatsJob,
})
s.NoError(err)
err = merr.Error(status)
s.NoError(err)
})
}
5 changes: 5 additions & 0 deletions internal/indexnode/task_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,11 @@ func (st *statsTask) Execute(ctx context.Context) error {
}
}

if len(insertLogs) == 0 {
log.Ctx(ctx).Info("there is no insertBinlogs, skip creating text index")
return nil
}

if st.req.GetSubJobType() == indexpb.StatsSubJob_Sort || st.req.GetSubJobType() == indexpb.StatsSubJob_TextIndexJob {
err = st.createTextIndex(ctx,
st.req.GetStorageConfig(),
Expand Down

0 comments on commit 8009af7

Please sign in to comment.