Skip to content

Commit

Permalink
enhance: Remove bf from streaming node (milvus-io#35902)
Browse files Browse the repository at this point in the history
Remove bf from streaming node:
1. When watching vchannels, skip loading bloom filters for segments.
2. Bypass bloom filter checks for delete messages, directly writing to
L0 segments.
3. Remove flushed segments proactively after flush.

issue: milvus-io#33285,
milvus-io#34585

---------

Signed-off-by: bigsheeper <[email protected]>
  • Loading branch information
bigsheeper authored Sep 3, 2024
1 parent 325f198 commit 6130a85
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 9 deletions.
11 changes: 6 additions & 5 deletions internal/flushcommon/pipeline/data_sync_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,11 +366,12 @@ func NewStreamingNodeDataSyncService(initCtx context.Context, pipelineParams *ut
}
}

// init metaCache meta
metaCache, err := initMetaCache(initCtx, pipelineParams.ChunkManager, info, nil, unflushedSegmentInfos, flushedSegmentInfos)
if err != nil {
return nil, err
}
// In streaming service mode, flushed segments no longer maintain a bloom filter.
// So, here we skip loading the bloom filter for flushed segments.
info.Vchan.UnflushedSegments = unflushedSegmentInfos
metaCache := metacache.NewMetaCache(info, func(segment *datapb.SegmentInfo) pkoracle.PkStat {
return pkoracle.NewBloomFilterSet()
})

return getServiceWithChannel(initCtx, pipelineParams, info, metaCache, unflushedSegmentInfos, flushedSegmentInfos, input)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/flushcommon/syncmgr/key_lock_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ import (
"github.com/milvus-io/milvus/pkg/util/lock"
)

//go:generate mockery --name=Task --structname=MockTask --output=./ --filename=mock_task.go --with-expecter --inpackage
type Task interface {
SegmentID() int64
Checkpoint() *msgpb.MsgPosition
StartPosition() *msgpb.MsgPosition
ChannelName() string
Run(context.Context) error
HandleError(error)
IsFlush() bool
}

type keyLockDispatcher[K comparable] struct {
Expand Down
41 changes: 41 additions & 0 deletions internal/flushcommon/syncmgr/mock_task.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions internal/flushcommon/syncmgr/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,10 @@ func (t *SyncTask) ChannelName() string {
return t.channelName
}

func (t *SyncTask) IsFlush() bool {
return t.isFlush
}

func (t *SyncTask) Binlogs() (map[int64]*datapb.FieldBinlog, map[int64]*datapb.FieldBinlog, *datapb.FieldBinlog) {
return t.insertBinlogs, t.statsBinlogs, t.deltaBinlog
}
24 changes: 21 additions & 3 deletions internal/flushcommon/writebuffer/l0_write_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/milvus-io/milvus/internal/flushcommon/syncmgr"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/streamingutil"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/conc"
Expand Down Expand Up @@ -139,6 +140,17 @@ func (wb *l0WriteBuffer) dispatchDeleteMsgs(groups []*inData, deleteMsgs []*msgs
})
}

func (wb *l0WriteBuffer) dispatchDeleteMsgsWithoutFilter(deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) {
for _, msg := range deleteMsgs {
l0SegmentID := wb.getL0SegmentID(msg.GetPartitionID(), startPos)
pks := storage.ParseIDs2PrimaryKeys(msg.GetPrimaryKeys())
pkTss := msg.GetTimestamps()
if len(pks) > 0 {
wb.bufferDelete(l0SegmentID, pks, pkTss, startPos, endPos)
}
}
}

func (wb *l0WriteBuffer) BufferData(insertMsgs []*msgstream.InsertMsg, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) error {
wb.mut.Lock()
defer wb.mut.Unlock()
Expand All @@ -156,9 +168,15 @@ func (wb *l0WriteBuffer) BufferData(insertMsgs []*msgstream.InsertMsg, deleteMsg
}
}

// distribute delete msg
// bf write buffer check bloom filter of segment and current insert batch to decide which segment to write delete data
wb.dispatchDeleteMsgs(groups, deleteMsgs, startPos, endPos)
if streamingutil.IsStreamingServiceEnabled() {
// In streaming service mode, flushed segments no longer maintain a bloom filter.
// So, here we skip filtering delete entries by bf.
wb.dispatchDeleteMsgsWithoutFilter(deleteMsgs, startPos, endPos)
} else {
// distribute delete msg
// bf write buffer check bloom filter of segment and current insert batch to decide which segment to write delete data
wb.dispatchDeleteMsgs(groups, deleteMsgs, startPos, endPos)
}

// update pk oracle
for _, inData := range groups {
Expand Down
6 changes: 6 additions & 0 deletions internal/flushcommon/writebuffer/write_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/milvus-io/milvus/internal/flushcommon/syncmgr"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/streamingutil"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
Expand Down Expand Up @@ -345,6 +346,11 @@ func (wb *writeBufferBase) syncSegments(ctx context.Context, segmentIDs []int64)
if syncTask.StartPosition() != nil {
wb.syncCheckpoint.Remove(syncTask.SegmentID(), syncTask.StartPosition().GetTimestamp())
}

if streamingutil.IsStreamingServiceEnabled() && syncTask.IsFlush() {
wb.metaCache.RemoveSegments(metacache.WithSegmentIDs(syncTask.SegmentID()))
log.Info("flushed segment removed", zap.Int64("segmentID", syncTask.SegmentID()), zap.String("channel", syncTask.ChannelName()))
}
return nil
}))
}
Expand Down

0 comments on commit 6130a85

Please sign in to comment.