Skip to content

Commit

Permalink
improve tbwr error logging
Browse files Browse the repository at this point in the history
  • Loading branch information
qrort committed Jan 10, 2025
1 parent ff196bd commit ed7f889
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 16 deletions.
47 changes: 31 additions & 16 deletions internal/handlers/take_backup_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ func setErrorToRetryOperation(
tbwr *types.TakeBackupWithRetryOperation,
ops []types.Operation,
clock clockwork.Clock,
isEmptyDb bool,
) {
operationIDs := strings.Join(func() []string {
var ids []string
Expand All @@ -171,16 +172,34 @@ func setErrorToRetryOperation(
now := clock.Now()
tbwr.UpdatedAt = timestamppb.New(now)
tbwr.Audit.CompletedAt = timestamppb.New(now)

tbwr.Message = fmt.Sprintf("retry attempts exceeded limit: %d.", tbwr.Retries)
fields := []zap.Field{
zap.Int("RetriesCount", len(ops)),
}
if len(ops) > 0 {
tbwr.Message = tbwr.Message + fmt.Sprintf(" Launched operations %s", operationIDs)
fields = append(fields, zap.String("OperationIDs", operationIDs))

if isEmptyDb {
tbwr.Message = "empty database"
} else {
if tbwr.RetryConfig != nil {
switch tbwr.RetryConfig.Retries.(type) {
case *pb.RetryConfig_Count:
{
tbwr.Message = fmt.Sprintf("retry attempts exceeded limit: %d.", tbwr.Retries)
}
case *pb.RetryConfig_MaxBackoff:
{
tbwr.Message = fmt.Sprintf("retry attempts exceeded backoff duration.")
}
}
} else {
tbwr.Message = fmt.Sprint("retry attempts exceeded limit: 1.")
}

if len(ops) > 0 {
tbwr.Message = tbwr.Message + fmt.Sprintf(" Launched operations %s", operationIDs)
fields = append(fields, zap.String("OperationIDs", operationIDs))
}
}
xlog.Error(ctx, "retry attempts exceeded limit for TBWR operation", fields...)
xlog.Error(ctx, tbwr.Message, fields...)
}

func TBWROperationHandler(
Expand Down Expand Up @@ -214,15 +233,11 @@ func TBWROperationHandler(
Field: "created_at",
}),
))

var lastTbOp *types.TakeBackupOperation
var existingTbOps = 0
for i := range ops {
existingTbOps++
lastTbOp = ops[i].(*types.TakeBackupOperation)
if len(ops) > 0 {
lastTbOp = ops[len(ops)-1].(*types.TakeBackupOperation)
}
//to update retry count for every operation that was launched before introducing
//persistent retry counter
tbwr.Retries = max(tbwr.Retries, existingTbOps)

if err != nil {
return fmt.Errorf("can't select Operations for TBWR op %s", tbwr.ID)
Expand All @@ -233,6 +248,7 @@ func TBWROperationHandler(
{
do, err := MakeRetryDecision(ctx, tbwr, lastTbOp, clock)
if err != nil {
xlog.Error(ctx, "RetryDecision failed", zap.Error(err))
tbwr.State = types.OperationStateError
tbwr.Message = err.Error()
now := clock.Now()
Expand Down Expand Up @@ -271,8 +287,7 @@ func TBWROperationHandler(
return nil
case Error:
{
setErrorToRetryOperation(ctx, tbwr, ops, clock)
xlog.Info(ctx, tbwr.Proto().String())
setErrorToRetryOperation(ctx, tbwr, ops, clock, false)
return db.ExecuteUpsert(ctx, queryBuilderFactory().WithUpdateOperation(tbwr))
}
case RunNewTb:
Expand All @@ -291,7 +306,7 @@ func TBWROperationHandler(
var empty *backup_operations.EmptyDatabaseError

if errors.As(err, &empty) {
setErrorToRetryOperation(ctx, tbwr, ops, clock)
setErrorToRetryOperation(ctx, tbwr, ops, clock, true)
return db.ExecuteUpsert(ctx, queryBuilderFactory().WithUpdateOperation(tbwr))
} else {
tbwr.IncRetries()
Expand Down
1 change: 1 addition & 0 deletions internal/handlers/take_backup_retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,7 @@ func TestTBWRHandlerError(t *testing.T) {
Audit: &pb.AuditInfo{},
},
RetryConfig: nil,
Retries: 1,
}

ops := []types.Operation{
Expand Down

0 comments on commit ed7f889

Please sign in to comment.