Skip to content

Commit

Permalink
fix: segment stats may be inconsistent after wal closing
Browse files Browse the repository at this point in the history
- The stats may be kept after wal closing if the growing segment is not dirty.
- Change the error handling of wal open to avoid redundant manager api call.

Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh committed Jan 25, 2025
1 parent e61a841 commit 6553911
Show file tree
Hide file tree
Showing 10 changed files with 155 additions and 57 deletions.
4 changes: 4 additions & 0 deletions internal/streamingcoord/client/assignment/assignment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ func TestAssignmentService(t *testing.T) {

assignmentService.ReportAssignmentError(ctx, types.PChannelInfo{Name: "c1", Term: 1}, errors.New("test"))

// Repeated report error at the same term should be ignored.
assignmentService.ReportAssignmentError(ctx, types.PChannelInfo{Name: "c1", Term: 1}, errors.New("test"))
assignmentService.ReportAssignmentError(ctx, types.PChannelInfo{Name: "c1", Term: 1}, errors.New("test"))

// test close
go close(closeCh)
time.Sleep(10 * time.Millisecond)
Expand Down
46 changes: 32 additions & 14 deletions internal/streamingcoord/client/assignment/discoverer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,29 @@ import (
// newAssignmentDiscoverClient creates a new assignment discover client.
func newAssignmentDiscoverClient(w *watcher, streamClient streamingpb.StreamingCoordAssignmentService_AssignmentDiscoverClient) *assignmentDiscoverClient {
c := &assignmentDiscoverClient{
lifetime: typeutil.NewLifetime(),
w: w,
streamClient: streamClient,
logger: log.With(),
requestCh: make(chan *streamingpb.AssignmentDiscoverRequest, 16),
exitCh: make(chan struct{}),
wg: sync.WaitGroup{},
lifetime: typeutil.NewLifetime(),
w: w,
streamClient: streamClient,
logger: log.With(),
requestCh: make(chan *streamingpb.AssignmentDiscoverRequest, 16),
exitCh: make(chan struct{}),
wg: sync.WaitGroup{},
lastErrorReportedTerm: make(map[string]int64),
}
c.executeBackgroundTask()
return c
}

// assignmentDiscoverClient is the client for assignment discover.
type assignmentDiscoverClient struct {
lifetime *typeutil.Lifetime
w *watcher
logger *log.MLogger
requestCh chan *streamingpb.AssignmentDiscoverRequest
exitCh chan struct{}
wg sync.WaitGroup
streamClient streamingpb.StreamingCoordAssignmentService_AssignmentDiscoverClient
lifetime *typeutil.Lifetime
w *watcher
logger *log.MLogger
requestCh chan *streamingpb.AssignmentDiscoverRequest
exitCh chan struct{}
wg sync.WaitGroup
streamClient streamingpb.StreamingCoordAssignmentService_AssignmentDiscoverClient
lastErrorReportedTerm map[string]int64
}

// ReportAssignmentError reports the assignment error to server.
Expand Down Expand Up @@ -101,12 +103,28 @@ func (c *assignmentDiscoverClient) sendLoop() (err error) {
}
return c.streamClient.CloseSend()
}
if c.shouldIgnore(req) {
continue
}
if err := c.streamClient.Send(req); err != nil {
return err
}
}
}

// shouldIgnore checks if the request should be ignored.
func (c *assignmentDiscoverClient) shouldIgnore(req *streamingpb.AssignmentDiscoverRequest) bool {
switch req := req.Command.(type) {
case *streamingpb.AssignmentDiscoverRequest_ReportError:
if term, ok := c.lastErrorReportedTerm[req.ReportError.Pchannel.Name]; ok && req.ReportError.Pchannel.Term <= term {
// If the error at newer term has been reported, ignore it right now.
return true
}
c.lastErrorReportedTerm[req.ReportError.Pchannel.Name] = req.ReportError.Pchannel.Term
}
return false
}

// recvLoop receives the message from server.
// 1. FullAssignment
// 2. Close
Expand Down
2 changes: 1 addition & 1 deletion internal/streamingcoord/server/balancer/balancer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func (b *balancerImpl) applyBalanceResultToStreamingNode(ctx context.Context, mo

// assign the channel to the target node.
if err := resource.Resource().StreamingNodeManagerClient().Assign(ctx, channel.CurrentAssignment()); err != nil {
b.logger.Warn("fail to assign channel", zap.Any("assignment", channel.CurrentAssignment()))
b.logger.Warn("fail to assign channel", zap.Any("assignment", channel.CurrentAssignment()), zap.Error(err))

Check warning on line 229 in internal/streamingcoord/server/balancer/balancer_impl.go

View check run for this annotation

Codecov / codecov/patch

internal/streamingcoord/server/balancer/balancer_impl.go#L229

Added line #L229 was not covered by tests
return err
}
b.logger.Info("assign channel success", zap.Any("assignment", channel.CurrentAssignment()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ import (
"context"
"time"

"go.uber.org/zap"

"github.com/milvus-io/milvus/internal/streamingnode/server/resource"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/stats"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/syncutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
Expand Down Expand Up @@ -121,9 +124,16 @@ func (s *sealOperationInspectorImpl) background() {
return true
})
case <-mustSealTicker.C:
segmentBelongs := resource.Resource().SegmentAssignStatsManager().SealByTotalGrowingSegmentsSize()
threshold := paramtable.Get().DataCoordCfg.GrowingSegmentsMemSizeInMB.GetAsUint64() * 1024 * 1024
segmentBelongs := resource.Resource().SegmentAssignStatsManager().SealByTotalGrowingSegmentsSize(threshold)
if segmentBelongs == nil {
continue
}
log.Info("seal by total growing segments size", zap.String("vchannel", segmentBelongs.VChannel),
zap.Uint64("sealThreshold", threshold),
zap.Int64("sealSegment", segmentBelongs.SegmentID))

Check warning on line 134 in internal/streamingnode/server/wal/interceptors/segment/inspector/impls.go

View check run for this annotation

Codecov / codecov/patch

internal/streamingnode/server/wal/interceptors/segment/inspector/impls.go#L132-L134

Added lines #L132 - L134 were not covered by tests
if pm, ok := s.managers.Get(segmentBelongs.PChannel); ok {
pm.MustSealSegments(s.taskNotifier.Context(), segmentBelongs)
pm.MustSealSegments(s.taskNotifier.Context(), *segmentBelongs)

Check warning on line 136 in internal/streamingnode/server/wal/interceptors/segment/inspector/impls.go

View check run for this annotation

Codecov / codecov/patch

internal/streamingnode/server/wal/interceptors/segment/inspector/impls.go#L136

Added line #L136 was not covered by tests
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,19 +164,14 @@ func (m *partitionSegmentManager) collectShouldBeSealedWithPolicy(predicates fun
return shouldBeSealedSegments
}

// CollectDirtySegmentsAndClear collects all segments in the manager and clear the maanger.
func (m *partitionSegmentManager) CollectDirtySegmentsAndClear() []*segmentAllocManager {
// CollectAllSegmentsAndClear collects all segments in the manager and clear the manager.
func (m *partitionSegmentManager) CollectAllSegmentsAndClear() []*segmentAllocManager {
m.mu.Lock()
defer m.mu.Unlock()

dirtySegments := make([]*segmentAllocManager, 0, len(m.segments))
for _, segment := range m.segments {
if segment.IsDirtyEnough() {
dirtySegments = append(dirtySegments, segment)
}
}
m.segments = make([]*segmentAllocManager, 0)
return dirtySegments
segments := m.segments
m.segments = nil
return segments
}

// CollectAllCanBeSealedAndClear collects all segments that can be sealed and clear the manager.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,31 +265,35 @@ func (m *PChannelSegmentAllocManager) Close(ctx context.Context) {

// Try to seal all wait
m.helper.SealAllWait(ctx)
m.logger.Info("seal all waited segments done", zap.Int("waitCounter", m.helper.WaitCounter()))
m.logger.Info("seal all waited segments done, may be some not done here", zap.Int("waitCounter", m.helper.WaitCounter()))

segments := make([]*segmentAllocManager, 0)
m.managers.Range(func(pm *partitionSegmentManager) {
segments = append(segments, pm.CollectDirtySegmentsAndClear()...)
segments = append(segments, pm.CollectAllSegmentsAndClear()...)
})

// commitAllSegmentsOnSamePChannel commits all segments on the same pchannel.
// Try to seal the dirty segment to avoid generate too large segment.
protoSegments := make([]*streamingpb.SegmentAssignmentMeta, 0, len(segments))
growingCnt := 0
for _, segment := range segments {
protoSegments = append(protoSegments, segment.Snapshot())
if segment.GetState() == streamingpb.SegmentAssignmentState_SEGMENT_ASSIGNMENT_STATE_GROWING {
growingCnt++
}
if segment.IsDirtyEnough() {
// Only persist the dirty segment.
protoSegments = append(protoSegments, segment.Snapshot())
}

Check warning on line 285 in internal/streamingnode/server/wal/interceptors/segment/manager/pchannel_manager.go

View check run for this annotation

Codecov / codecov/patch

internal/streamingnode/server/wal/interceptors/segment/manager/pchannel_manager.go#L279-L285

Added lines #L279 - L285 were not covered by tests
}

m.logger.Info("segment assignment manager save all dirty segment assignments info", zap.Int("segmentCount", len(protoSegments)))
m.logger.Info("segment assignment manager save all dirty segment assignments info",
zap.Int("dirtySegmentCount", len(protoSegments)),
zap.Int("growingSegmentCount", growingCnt),
zap.Int("segmentCount", len(segments)))
if err := resource.Resource().StreamingNodeCatalog().SaveSegmentAssignments(ctx, m.pchannel.Name, protoSegments); err != nil {
m.logger.Warn("commit segment assignment at pchannel failed", zap.Error(err))
}

// remove the stats from stats manager.
m.logger.Info("segment assignment manager remove all segment stats from stats manager")
for _, segment := range segments {
if segment.GetState() == streamingpb.SegmentAssignmentState_SEGMENT_ASSIGNMENT_STATE_GROWING {
resource.Resource().SegmentAssignStatsManager().UnregisterSealedSegment(segment.GetSegmentID())
}
}

removedStatsSegmentCnt := resource.Resource().SegmentAssignStatsManager().UnregisterAllStatsOnPChannel(m.pchannel.Name)
m.logger.Info("segment assignment manager remove all segment stats from stats manager", zap.Int("removedStatsSegmentCount", removedStatsSegmentCnt))
m.metrics.Close()
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@ import (
"sync"

"github.com/cockroachdb/errors"
"github.com/pingcap/log"
"go.uber.org/zap"

"github.com/milvus-io/milvus/pkg/util/paramtable"
)

var (
Expand All @@ -24,8 +20,9 @@ type StatsManager struct {
totalStats InsertMetrics
pchannelStats map[string]*InsertMetrics
vchannelStats map[string]*InsertMetrics
segmentStats map[int64]*SegmentStats // map[SegmentID]SegmentStats
segmentIndex map[int64]SegmentBelongs // map[SegmentID]channels
segmentStats map[int64]*SegmentStats // map[SegmentID]SegmentStats
segmentIndex map[int64]SegmentBelongs // map[SegmentID]channels
pchannelIndex map[string]map[int64]struct{} // map[PChannel]SegmentID
sealNotifier *SealSignalNotifier
}

Expand All @@ -46,6 +43,7 @@ func NewStatsManager() *StatsManager {
vchannelStats: make(map[string]*InsertMetrics),
segmentStats: make(map[int64]*SegmentStats),
segmentIndex: make(map[int64]SegmentBelongs),
pchannelIndex: make(map[string]map[int64]struct{}),
sealNotifier: NewSealSignalNotifier(),
}
}
Expand All @@ -62,6 +60,10 @@ func (m *StatsManager) RegisterNewGrowingSegment(belongs SegmentBelongs, segment

m.segmentStats[segmentID] = stats
m.segmentIndex[segmentID] = belongs
if _, ok := m.pchannelIndex[belongs.PChannel]; !ok {
m.pchannelIndex[belongs.PChannel] = make(map[int64]struct{})
}
m.pchannelIndex[belongs.PChannel][segmentID] = struct{}{}
m.totalStats.Collect(stats.Insert)
if _, ok := m.pchannelStats[belongs.PChannel]; !ok {
m.pchannelStats[belongs.PChannel] = &InsertMetrics{}
Expand Down Expand Up @@ -145,6 +147,10 @@ func (m *StatsManager) UnregisterSealedSegment(segmentID int64) *SegmentStats {
m.mu.Lock()
defer m.mu.Unlock()

return m.unregisterSealedSegment(segmentID)
}

func (m *StatsManager) unregisterSealedSegment(segmentID int64) *SegmentStats {
// Must be exist, otherwise it's a bug.
info, ok := m.segmentIndex[segmentID]
if !ok {
Expand All @@ -156,6 +162,13 @@ func (m *StatsManager) UnregisterSealedSegment(segmentID int64) *SegmentStats {
m.totalStats.Subtract(stats.Insert)
delete(m.segmentStats, segmentID)
delete(m.segmentIndex, segmentID)
if _, ok := m.pchannelIndex[info.PChannel]; ok {
delete(m.pchannelIndex[info.PChannel], segmentID)
if len(m.pchannelIndex[info.PChannel]) == 0 {
delete(m.pchannelIndex, info.PChannel)
}
}

if _, ok := m.pchannelStats[info.PChannel]; ok {
m.pchannelStats[info.PChannel].Subtract(stats.Insert)
if m.pchannelStats[info.PChannel].BinarySize == 0 {
Expand All @@ -171,15 +184,29 @@ func (m *StatsManager) UnregisterSealedSegment(segmentID int64) *SegmentStats {
return stats
}

// UnregisterAllStatsOnPChannel unregisters all stats on pchannel.
func (m *StatsManager) UnregisterAllStatsOnPChannel(pchannel string) int {
m.mu.Lock()
defer m.mu.Unlock()

segmentIDs, ok := m.pchannelIndex[pchannel]
if !ok {
return 0
}
for segmentID := range segmentIDs {
m.unregisterSealedSegment(segmentID)
}
return len(segmentIDs)
}

// SealByTotalGrowingSegmentsSize seals the largest growing segment
// if the total size of growing segments in ANY vchannel exceeds the threshold.
func (m *StatsManager) SealByTotalGrowingSegmentsSize() SegmentBelongs {
func (m *StatsManager) SealByTotalGrowingSegmentsSize(vchannelThreshold uint64) *SegmentBelongs {
m.mu.Lock()
defer m.mu.Unlock()

for vchannel, metrics := range m.vchannelStats {
threshold := paramtable.Get().DataCoordCfg.GrowingSegmentsMemSizeInMB.GetAsUint64() * 1024 * 1024
if metrics.BinarySize >= threshold {
for _, metrics := range m.vchannelStats {
if metrics.BinarySize >= vchannelThreshold {
var (
largestSegment int64 = 0
largestSegmentSize uint64 = 0
Expand All @@ -190,13 +217,14 @@ func (m *StatsManager) SealByTotalGrowingSegmentsSize() SegmentBelongs {
largestSegment = segmentID
}
}
log.Info("seal by total growing segments size", zap.String("vchannel", vchannel),
zap.Uint64("vchannelGrowingSize", metrics.BinarySize), zap.Uint64("sealThreshold", threshold),
zap.Int64("sealSegment", largestSegment), zap.Uint64("sealSegmentSize", largestSegmentSize))
return m.segmentIndex[largestSegment]
belongs, ok := m.segmentIndex[largestSegment]
if !ok {
panic("unrechable: the segmentID should always be found in segmentIndex")

Check warning on line 222 in internal/streamingnode/server/wal/interceptors/segment/stats/stats_manager.go

View check run for this annotation

Codecov / codecov/patch

internal/streamingnode/server/wal/interceptors/segment/stats/stats_manager.go#L222

Added line #L222 was not covered by tests
}
return &belongs
}
}
return SegmentBelongs{}
return nil
}

// InsertOpeatationMetrics is the metrics of insert operation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,25 @@ func TestStatsManager(t *testing.T) {
assert.Panics(t, func() {
m.UnregisterSealedSegment(1)
})
m.UnregisterAllStatsOnPChannel("pchannel")
m.UnregisterAllStatsOnPChannel("pchannel2")
}

func TestSealByTotalGrowingSegmentsSize(t *testing.T) {
m := NewStatsManager()
m.RegisterNewGrowingSegment(SegmentBelongs{PChannel: "pchannel", VChannel: "vchannel", CollectionID: 1, PartitionID: 2, SegmentID: 3}, 3, createSegmentStats(100, 100, 300))
m.RegisterNewGrowingSegment(SegmentBelongs{PChannel: "pchannel", VChannel: "vchannel", CollectionID: 1, PartitionID: 2, SegmentID: 4}, 4, createSegmentStats(100, 200, 300))
m.RegisterNewGrowingSegment(SegmentBelongs{PChannel: "pchannel", VChannel: "vchannel", CollectionID: 1, PartitionID: 2, SegmentID: 5}, 5, createSegmentStats(100, 100, 300))
belongs := m.SealByTotalGrowingSegmentsSize(401)
assert.Nil(t, belongs)
belongs = m.SealByTotalGrowingSegmentsSize(400)
assert.NotNil(t, belongs)
assert.Equal(t, int64(4), belongs.SegmentID)
m.UnregisterAllStatsOnPChannel("pchannel")
assert.Empty(t, m.pchannelStats)
assert.Empty(t, m.vchannelStats)
assert.Empty(t, m.segmentStats)
assert.Empty(t, m.segmentIndex)
}

func createSegmentStats(row uint64, binarySize uint64, maxBinarSize uint64) *SegmentStats {
Expand Down
14 changes: 13 additions & 1 deletion internal/streamingnode/server/walmanager/manager_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ func (m *managerImpl) GetAvailableWAL(channel types.PChannelInfo) (wal.WAL, erro
if currentTerm != channel.Term {
return nil, status.NewUnmatchedChannelTerm(channel.Name, channel.Term, currentTerm)
}
return l, nil
// wal's lifetime is fully managed by wal manager,
// so wrap the wal instance to prevent it from being closed by other components.
return nopCloseWAL{l}, nil
}

// GetAllAvailableChannels returns all available channel info.
Expand Down Expand Up @@ -176,3 +178,13 @@ func isRemoveable(state managerState) bool {
func isOpenable(state managerState) bool {
return state&managerOpenable != 0
}

// wal can be only closed by the wal manager.
// So wrap the wal instance to prevent it from being closed by other components.
type nopCloseWAL struct {
wal.WAL
}

func (w nopCloseWAL) Close() {
// do nothing

Check warning on line 189 in internal/streamingnode/server/walmanager/manager_impl.go

View check run for this annotation

Codecov / codecov/patch

internal/streamingnode/server/walmanager/manager_impl.go#L188-L189

Added lines #L188 - L189 were not covered by tests
}
10 changes: 9 additions & 1 deletion internal/streamingnode/server/walmanager/wal_lifetime.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package walmanager
import (
"context"

"github.com/cockroachdb/errors"
"go.uber.org/zap"

"github.com/milvus-io/milvus/internal/streamingnode/server/resource"
Expand Down Expand Up @@ -72,7 +73,14 @@ func (w *walLifetime) Remove(ctx context.Context, term int64) error {
}

// Wait until the WAL state is ready or term expired or error occurs.
return w.statePair.WaitCurrentStateReachExpected(ctx, expected)
err := w.statePair.WaitCurrentStateReachExpected(ctx, expected)
if errors.IsAny(err, context.Canceled, context.DeadlineExceeded) {
return err
}
if err != nil {
w.logger.Info("remove wal success because that previous open operation is failure", zap.NamedError("previousOpenError", err))
}

Check warning on line 82 in internal/streamingnode/server/walmanager/wal_lifetime.go

View check run for this annotation

Codecov / codecov/patch

internal/streamingnode/server/walmanager/wal_lifetime.go#L81-L82

Added lines #L81 - L82 were not covered by tests
return nil
}

// Close closes the wal lifetime.
Expand Down

0 comments on commit 6553911

Please sign in to comment.