Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

REP-5358 Add additional retryer updates. #78

Merged
merged 19 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/partitions/partitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@ func getMidIDBounds(

// Append the copied bound to the other mid _id bounds.
midIDBounds = append(midIDBounds, bound)
ri.NoteSuccess()
ri.NoteSuccess("received an ID partition")
}

return cursor.Err()
Expand Down
64 changes: 43 additions & 21 deletions internal/retry/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,14 @@ func (r *Retryer) runRetryLoop(
li := &LoopInfo{
durationLimit: r.retryLimit,
}
funcinfos := lo.RepeatBy(
len(r.callbacks),
func(_ int) *FuncInfo {
funcinfos := lo.Map(
r.callbacks,
func(cb retryCallbackInfo, _ int) *FuncInfo {
return &FuncInfo{
lastResetTime: msync.NewTypedAtomic(startTime),
lastReset: msync.NewTypedAtomic(lastResetInfo{
time: startTime,
}),
description: cb.description,
loopDescription: r.description,
loopInfo: li,
}
Expand Down Expand Up @@ -113,17 +116,25 @@ func (r *Retryer) runRetryLoop(
defer ticker.Stop()

for {
lastSuccessTime := funcinfos[i].lastResetTime.Load()
lastReset := funcinfos[i].lastReset.Load()

select {
case <-cbDoneChan:
return
case <-ticker.C:
if funcinfos[i].lastResetTime.Load() == lastSuccessTime {
logger.Warn().
Str("callbackDescription", curCbInfo.description).
Time("lastSuccessAt", lastSuccessTime).
Str("elapsedTime", reportutils.DurationToHMS(time.Since(lastSuccessTime))).
if funcinfos[i].lastReset.Load() == lastReset {
event := logger.Warn().
Strs("description", funcinfos[i].GetDescriptions()).
Time("noSuccessSince", lastReset.time).
Uint64("successesSoFar", lastReset.resetsSoFar)

if successDesc, hasDesc := lastReset.description.Get(); hasDesc {
event.
Str("lastSuccessDescription", successDesc)
}

event.
Str("elapsedTime", reportutils.DurationToHMS(time.Since(lastReset.time))).
Msg("Operation has not reported success for a while.")
}
}
Expand Down Expand Up @@ -164,9 +175,11 @@ func (r *Retryer) runRetryLoop(
}

failedFuncInfo := funcinfos[groupErr.funcNum]
descriptions := failedFuncInfo.GetDescriptions()
cbErr := groupErr.errFromCallback

// Not a transient error? Fail immediately.
if !r.shouldRetryWithSleep(logger, sleepTime, *failedFuncInfo, groupErr.errFromCallback) {
if !r.shouldRetryWithSleep(logger, sleepTime, descriptions, cbErr) {
return groupErr.errFromCallback
}

Expand Down Expand Up @@ -201,7 +214,7 @@ func (r *Retryer) runRetryLoop(
// Set all of the funcs that did *not* fail as having just succeeded.
for i, curInfo := range funcinfos {
if i != groupErr.funcNum {
curInfo.lastResetTime.Store(now)
curInfo.lastReset.Store(lastResetInfo{time: now})
}
}
}
Expand Down Expand Up @@ -235,7 +248,7 @@ func (r *Retryer) addDescriptionToEvent(event *zerolog.Event) *zerolog.Event {
func (r *Retryer) shouldRetryWithSleep(
logger *logger.Logger,
sleepTime time.Duration,
funcinfo FuncInfo,
descriptions []string,
err error,
) bool {
if err == nil {
Expand All @@ -250,26 +263,35 @@ func (r *Retryer) shouldRetryWithSleep(
)

event := logger.WithLevel(
lo.Ternary(isTransient, zerolog.InfoLevel, zerolog.WarnLevel),
lo.Ternary(
// If it’s transient, surface it as info.
isTransient,
zerolog.InfoLevel,

lo.Ternary(
// Context cancellation is unimportant, so debug.
errors.Is(err, context.Canceled),
zerolog.DebugLevel,

// Other non-retryables are serious, so warn.
zerolog.WarnLevel,
),
),
)

if loopDesc, hasLoopDesc := r.description.Get(); hasLoopDesc {
event.Str("operationDescription", loopDesc)
}

event.Str("callbackDescription", funcinfo.description).
event.Strs("description", descriptions).
Int("error code", util.GetErrorCode(err)).
Err(err)

if isTransient {
event.
Stringer("delay", sleepTime).
Msg("Pausing before retrying after transient error.")
Msg("Got retryable error. Pausing, then will retry.")

return true
}

event.Msg("Non-transient error occurred.")
event.Msg("Non-retryable error occurred.")

return false
}
35 changes: 31 additions & 4 deletions internal/retry/retry_info.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package retry

import (
"fmt"
"slices"
"time"

"github.com/10gen/migration-verifier/internal/reportutils"
"github.com/10gen/migration-verifier/mslices"
"github.com/10gen/migration-verifier/msync"
"github.com/10gen/migration-verifier/option"
"github.com/rs/zerolog"
Expand All @@ -19,11 +22,20 @@ type LoopInfo struct {
durationLimit time.Duration
}

type lastResetInfo struct {
time time.Time

// These go into logs to facilitate debugging.
description option.Option[string]
resetsSoFar uint64
}

type FuncInfo struct {
loopInfo *LoopInfo
description string
loopDescription option.Option[string]
lastResetTime *msync.TypedAtomic[time.Time]

lastReset *msync.TypedAtomic[lastResetInfo]
}

// Log will log a debug-level message for the current Info values and the provided strings.
Expand Down Expand Up @@ -69,7 +81,7 @@ func (fi *FuncInfo) GetAttemptNumber() int {
// GetDurationSoFar returns the Info's current duration so far. This duration
// applies to the duration of retrying for transient errors only.
func (fi *FuncInfo) GetDurationSoFar() time.Duration {
return time.Since(fi.lastResetTime.Load())
return time.Since(fi.lastReset.Load().time)
}

// NoteSuccess is used to tell the retry util to reset its measurement
Expand All @@ -78,6 +90,21 @@ func (fi *FuncInfo) GetDurationSoFar() time.Duration {
//
// Call this after every successful command in a multi-command callback.
// (It’s useless--but harmless--in a single-command callback.)
func (i *FuncInfo) NoteSuccess() {
i.lastResetTime.Store(time.Now())
func (i *FuncInfo) NoteSuccess(description string, descArgs ...any) {
totalResets := i.lastReset.Load().resetsSoFar

i.lastReset.Store(lastResetInfo{
description: option.Some(fmt.Sprintf(description, descArgs...)),
time: time.Now(),
resetsSoFar: 1 + totalResets,
})
}

func (i *FuncInfo) GetDescriptions() []string {
descriptions := mslices.Of(i.description)
if loopDesc, hasDesc := i.loopDescription.Get(); hasDesc {
descriptions = slices.Insert(descriptions, 0, loopDesc)
}

return descriptions
}
17 changes: 8 additions & 9 deletions internal/retry/retryer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/10gen/migration-verifier/internal/util"
"github.com/10gen/migration-verifier/option"
"go.mongodb.org/mongo-driver/mongo"
)

Expand Down Expand Up @@ -99,8 +100,11 @@ func (suite *UnitTestSuite) TestRetryerDurationReset() {
noSuccessIterations := 0
f1 := func(_ context.Context, ri *FuncInfo) error {
// Artificially advance how much time was taken.
ri.lastResetTime.Store(
ri.lastResetTime.Load().Add(-2 * ri.loopInfo.durationLimit),
ri.lastReset.Store(
lastResetInfo{
time: ri.lastReset.Load().time.Add(-2 * ri.loopInfo.durationLimit),
description: option.Some("artificially rewinding time"),
},
)

noSuccessIterations++
Expand All @@ -123,12 +127,7 @@ func (suite *UnitTestSuite) TestRetryerDurationReset() {
// duration should be reset.
successIterations := 0
f2 := func(_ context.Context, ri *FuncInfo) error {
// Artificially advance how much time was taken.
ri.lastResetTime.Store(
ri.lastResetTime.Load().Add(-2 * ri.loopInfo.durationLimit),
)

ri.NoteSuccess()
ri.NoteSuccess("immediate success")

successIterations++
if successIterations == 1 {
Expand Down Expand Up @@ -307,7 +306,7 @@ func (suite *UnitTestSuite) TestMulti_LongRunningSuccess() {

err := retryer.WithCallback(
func(ctx context.Context, fi *FuncInfo) error {
fi.NoteSuccess()
fi.NoteSuccess("success right away")

if time.Now().Before(succeedPastTime) {
time.Sleep(1 * time.Second)
Expand Down
14 changes: 10 additions & 4 deletions internal/verifier/change_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,11 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch(
}

if changeEventBatch == nil {
changeEventBatch = make([]ParsedEvent, cs.RemainingBatchLength()+1)
batchSize := cs.RemainingBatchLength() + 1

ri.NoteSuccess("received a batch of %d change event(s)", batchSize)

changeEventBatch = make([]ParsedEvent, batchSize)
}

if err := cs.Decode(&changeEventBatch[eventsRead]); err != nil {
Expand All @@ -303,9 +307,9 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch(
eventsRead++
}

ri.NoteSuccess()

if eventsRead == 0 {
ri.NoteSuccess("received an empty change stream response")

return nil
}

Expand Down Expand Up @@ -432,7 +436,9 @@ func (csr *ChangeStreamReader) iterateChangeStream(
infoLog = infoLog.Interface("lastEventTime", *csr.lastChangeEventTime)
}

infoLog.Msg("Change stream is done.")
infoLog.
Stringer("reader", csr).
Msg("Change stream reader is done.")

return nil
}
Expand Down
7 changes: 4 additions & 3 deletions internal/verifier/compare.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func (verifier *Verifier) getFetcherChannelsAndCallbacks(
)

if err == nil {
state.NoteSuccess()
state.NoteSuccess("opened src find cursor")

err = errors.Wrap(
iterateCursorToChannel(ctx, state, cursor, srcChannel),
Expand All @@ -350,7 +350,7 @@ func (verifier *Verifier) getFetcherChannelsAndCallbacks(
)

if err == nil {
state.NoteSuccess()
state.NoteSuccess("opened dst find cursor")

err = errors.Wrap(
iterateCursorToChannel(ctx, state, cursor, dstChannel),
Expand Down Expand Up @@ -378,12 +378,13 @@ func iterateCursorToChannel(
defer close(writer)

for cursor.Next(ctx) {
state.NoteSuccess()
state.NoteSuccess("received a document")

select {
case <-ctx.Done():
return ctx.Err()
case writer <- slices.Clone(cursor.Current):
state.NoteSuccess("sent document to compare thread")
}
}

Expand Down
Loading