Skip to content

Commit

Permalink
fix: deleted the sealed segment data accidentally (#39422)
Browse files Browse the repository at this point in the history
issue:#39333
pr: #39421

Signed-off-by: zhenshan.cao <[email protected]>
  • Loading branch information
czs007 authored Jan 20, 2025
1 parent 6342237 commit 964000f
Show file tree
Hide file tree
Showing 5 changed files with 248 additions and 8 deletions.
35 changes: 35 additions & 0 deletions internal/datacoord/mock_segment_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 33 additions & 0 deletions internal/datacoord/mock_trigger_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 16 additions & 8 deletions internal/datacoord/segment_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ type Manager interface {
ExpireAllocations(ctx context.Context, channel string, ts Timestamp)
// DropSegmentsOfChannel drops all segments in a channel
DropSegmentsOfChannel(ctx context.Context, channel string)
// CleanZeroSealedSegmentsOfChannel try to clean real empty sealed segments in a channel
CleanZeroSealedSegmentsOfChannel(ctx context.Context, channel string, cpTs Timestamp)
}

// Allocation records the allocation info
Expand Down Expand Up @@ -511,9 +513,6 @@ func (s *SegmentManager) GetFlushableSegments(ctx context.Context, channel strin
return nil, err
}

// TODO: It's too frequent; perhaps each channel could check once per minute instead.
s.cleanupSealedSegment(ctx, t, channel)

sealed, ok := s.channel2Sealed.Get(channel)
if !ok {
return nil, nil
Expand Down Expand Up @@ -565,26 +564,35 @@ func (s *SegmentManager) ExpireAllocations(ctx context.Context, channel string,
})
}

func (s *SegmentManager) cleanupSealedSegment(ctx context.Context, ts Timestamp, channel string) {
func (s *SegmentManager) CleanZeroSealedSegmentsOfChannel(ctx context.Context, channel string, cpTs Timestamp) {
s.channelLock.Lock(channel)
defer s.channelLock.Unlock(channel)

sealed, ok := s.channel2Sealed.Get(channel)
if !ok {
log.Info("try remove empty sealed segment after channel cp updated failed to get channel", zap.String("channel", channel))
return
}
sealed.Range(func(id int64) bool {
segment := s.meta.GetHealthySegment(ctx, id)
if segment == nil {
log.Warn("failed to get segment, remove it", zap.String("channel", channel), zap.Int64("segmentID", id))
log.Warn("try remove empty sealed segment, failed to get segment, remove it in channel2Sealed", zap.String("channel", channel), zap.Int64("segmentID", id))
sealed.Remove(id)
return true
}
// Check if segment is empty
if segment.GetLastExpireTime() <= ts && segment.currRows == 0 {
log.Info("remove empty sealed segment", zap.Int64("collection", segment.CollectionID), zap.Int64("segment", id))
if segment.GetLastExpireTime() > 0 && segment.GetLastExpireTime() < cpTs && segment.currRows == 0 && segment.GetNumOfRows() == 0 {
log.Info("try remove empty sealed segment after channel cp updated",
zap.Int64("collection", segment.CollectionID), zap.Int64("segment", id),
zap.String("channel", channel), zap.Any("cpTs", cpTs))
if err := s.meta.SetState(ctx, id, commonpb.SegmentState_Dropped); err != nil {
log.Warn("failed to set segment state to dropped", zap.String("channel", channel),
log.Warn("try remove empty sealed segment after channel cp updated, failed to set segment state to dropped", zap.String("channel", channel),
zap.Int64("segmentID", id), zap.Error(err))
} else {
sealed.Remove(id)
log.Info("succeed to remove empty sealed segment",
zap.Int64("collection", segment.CollectionID), zap.Int64("segment", id),
zap.String("channel", channel), zap.Any("cpTs", cpTs), zap.Any("expireTs", segment.GetLastExpireTime()))
}
}
return true
Expand Down
157 changes: 157 additions & 0 deletions internal/datacoord/segment_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@ import (
"github.com/stretchr/testify/mock"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datacoord/allocator"
"github.com/milvus-io/milvus/internal/datacoord/broker"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
mockkv "github.com/milvus-io/milvus/internal/kv/mocks"
"github.com/milvus-io/milvus/internal/metastore/kv/datacoord"
"github.com/milvus-io/milvus/internal/metastore/mocks"
"github.com/milvus-io/milvus/pkg/proto/datapb"
"github.com/milvus-io/milvus/pkg/proto/rootcoordpb"
"github.com/milvus-io/milvus/pkg/util/etcd"
Expand Down Expand Up @@ -512,6 +514,15 @@ func TestGetFlushableSegments(t *testing.T) {
assert.EqualValues(t, allocations[0].SegmentID, ids[0])

meta.SetCurrentRows(allocations[0].SegmentID, 0)
postions := make([]*msgpb.MsgPosition, 0)
cpTs := allocations[0].ExpireTime + 1
postions = append(postions, &msgpb.MsgPosition{
ChannelName: "c1",
MsgID: []byte{1, 2, 3},
Timestamp: cpTs,
})
meta.UpdateChannelCheckpoints(context.TODO(), postions)
segmentManager.CleanZeroSealedSegmentsOfChannel(context.TODO(), "c1", cpTs)
ids, err = segmentManager.GetFlushableSegments(context.TODO(), "c1", allocations[0].ExpireTime)
assert.NoError(t, err)
assert.Empty(t, ids)
Expand Down Expand Up @@ -902,3 +913,149 @@ func TestSegmentManager_DropSegmentsOfChannel(t *testing.T) {
})
}
}

func TestSegmentManager_CleanZeroSealedSegmentsOfChannel(t *testing.T) {
partitionID := int64(100)
type fields struct {
meta *meta
segments []UniqueID
}
type args struct {
channel string
cpTs Timestamp
}

mockCatalog := mocks.NewDataCoordCatalog(t)
mockCatalog.EXPECT().AlterSegments(mock.Anything, mock.Anything, mock.Anything).Return(nil)

seg1 := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
PartitionID: partitionID,
InsertChannel: "ch1",
State: commonpb.SegmentState_Sealed,
NumOfRows: 1,
LastExpireTime: 100,
},
currRows: 1,
}
seg2 := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 2,
PartitionID: partitionID,
InsertChannel: "ch1",
State: commonpb.SegmentState_Sealed,
NumOfRows: 0,
LastExpireTime: 100,
},
currRows: 0,
}
seg3 := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 3,
PartitionID: partitionID,
InsertChannel: "ch1",
State: commonpb.SegmentState_Sealed,
LastExpireTime: 90,
},
}
seg4 := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 4,
PartitionID: partitionID,
InsertChannel: "ch2",
State: commonpb.SegmentState_Growing,
NumOfRows: 1,
LastExpireTime: 100,
},
currRows: 1,
}

newMetaFunc := func() *meta {
return &meta{
catalog: mockCatalog,
segments: &SegmentsInfo{
segments: map[int64]*SegmentInfo{
1: seg1,
2: seg2,
3: seg3,
4: seg4,
},
secondaryIndexes: segmentInfoIndexes{
coll2Segments: map[UniqueID]map[UniqueID]*SegmentInfo{
0: {1: seg1, 2: seg2, 3: seg3, 4: seg4},
},
channel2Segments: map[string]map[UniqueID]*SegmentInfo{
"ch1": {1: seg1, 2: seg2, 3: seg3},
"ch2": {4: seg4},
},
},
},
}
}

tests := []struct {
name string
fields fields
args args
want []UniqueID
}{
{
"test clean empty sealed segments with normal channel cp <= lastExpireTs",
fields{
meta: newMetaFunc(),
segments: []UniqueID{1, 2, 3, 4},
},
args{
"ch1", 100,
},
[]UniqueID{1, 2, 4},
},
{
"test clean empty sealed segments with normal channel cp > lastExpireTs",
fields{
meta: newMetaFunc(),
segments: []UniqueID{1, 2, 3, 4},
},
args{
"ch1", 101,
},
[]UniqueID{1, 4},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := &SegmentManager{
meta: tt.fields.meta,
channelLock: lock.NewKeyLock[string](),
channel2Growing: typeutil.NewConcurrentMap[string, typeutil.UniqueSet](),
channel2Sealed: typeutil.NewConcurrentMap[string, typeutil.UniqueSet](),
}
for _, segmentID := range tt.fields.segments {
segmentInfo := tt.fields.meta.GetSegment(context.TODO(), segmentID)
channel := tt.args.channel
if segmentInfo != nil {
channel = segmentInfo.GetInsertChannel()
}
if segmentInfo == nil || segmentInfo.GetState() == commonpb.SegmentState_Growing {
growing, _ := s.channel2Growing.GetOrInsert(channel, typeutil.NewUniqueSet())
growing.Insert(segmentID)
} else if segmentInfo.GetState() == commonpb.SegmentState_Sealed {
sealed, _ := s.channel2Sealed.GetOrInsert(channel, typeutil.NewUniqueSet())
sealed.Insert(segmentID)
}
}
s.CleanZeroSealedSegmentsOfChannel(context.TODO(), tt.args.channel, tt.args.cpTs)
all := make([]int64, 0)
s.channel2Sealed.Range(func(_ string, segments typeutil.UniqueSet) bool {
all = append(all, segments.Collect()...)
return true
})
s.channel2Growing.Range(func(_ string, segments typeutil.UniqueSet) bool {
all = append(all, segments.Collect()...)
return true
})
assert.ElementsMatch(t, tt.want, all)
})
}
}
7 changes: 7 additions & 0 deletions internal/datacoord/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -1448,6 +1448,13 @@ func (s *Server) UpdateChannelCheckpoint(ctx context.Context, req *datapb.Update
return merr.Status(err), nil
}

for _, pos := range checkpoints {
if pos == nil || pos.GetMsgID() == nil || pos.GetChannelName() == "" {
continue
}
s.segmentManager.CleanZeroSealedSegmentsOfChannel(ctx, pos.GetChannelName(), pos.GetTimestamp())
}

return merr.Success(), nil
}

Expand Down

0 comments on commit 964000f

Please sign in to comment.