Skip to content

Commit

Permalink
tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
FGasper committed Dec 10, 2024
1 parent 4a9dc05 commit 6ddd395
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 6 deletions.
5 changes: 3 additions & 2 deletions internal/retry/retry_info.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package retry

import (
"fmt"
"slices"
"time"

Expand Down Expand Up @@ -89,11 +90,11 @@ func (fi *FuncInfo) GetDurationSoFar() time.Duration {
//
// Call this after every successful command in a multi-command callback.
// (It’s useless--but harmless--in a single-command callback.)
func (i *FuncInfo) NoteSuccess(description string) {
func (i *FuncInfo) NoteSuccess(description string, descArgs ...any) {
totalResets := i.lastReset.Load().resetsSoFar

i.lastReset.Store(lastResetInfo{
description: option.Some(description),
description: option.Some(fmt.Sprintf(description, descArgs...)),
time: time.Now(),
resetsSoFar: 1 + totalResets,
})
Expand Down
14 changes: 10 additions & 4 deletions internal/verifier/change_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,11 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch(
}

if changeEventBatch == nil {
changeEventBatch = make([]ParsedEvent, cs.RemainingBatchLength()+1)
batchSize := cs.RemainingBatchLength() + 1

ri.NoteSuccess("received a batch of %d change event(s)", batchSize)

changeEventBatch = make([]ParsedEvent, batchSize)
}

if err := cs.Decode(&changeEventBatch[eventsRead]); err != nil {
Expand All @@ -303,9 +307,9 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch(
eventsRead++
}

ri.NoteSuccess("received a batch of change events")

if eventsRead == 0 {
ri.NoteSuccess("received an empty change stream response")

return nil
}

Expand Down Expand Up @@ -432,7 +436,9 @@ func (csr *ChangeStreamReader) iterateChangeStream(
infoLog = infoLog.Interface("lastEventTime", *csr.lastChangeEventTime)
}

infoLog.Msg("Change stream is done.")
infoLog.
Stringer("reader", csr).
Msg("Change stream reader is done.")

return nil
}
Expand Down

0 comments on commit 6ddd395

Please sign in to comment.