From 8635252c83204e42f23186b740cd4391d1634179 Mon Sep 17 00:00:00 2001 From: Daniel T <30197399+danwt@users.noreply.github.com> Date: Tue, 30 Apr 2024 15:42:58 +0100 Subject: [PATCH] fix(DA): use expo backoff in retries (#739) Co-authored-by: github-actions --- CHANGELOG.md | 2 +- block/manager.go | 2 +- block/produce.go | 2 +- da/avail/avail.go | 8 +-- da/celestia/celestia.go | 44 ++++++++++------ da/celestia/config.go | 9 +++- da/celestia/rpc_test.go | 4 +- node/events/{node.go => types.go} | 10 +++- node/events/types_test.go | 26 ++++++++++ node/node.go | 2 +- settlement/dymension/dymension.go | 21 ++++---- utils/retry/backoff.go | 84 +++++++++++++++++++++++++++++++ utils/retry/backoff_test.go | 37 ++++++++++++++ utils/retry/doc.go | 4 ++ 14 files changed, 219 insertions(+), 36 deletions(-) rename node/events/{node.go => types.go} (73%) create mode 100644 node/events/types_test.go create mode 100644 utils/retry/backoff.go create mode 100644 utils/retry/backoff_test.go create mode 100644 utils/retry/doc.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 8910906cf..7f75b2527 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,4 @@ -# [](https://github.com/dymensionxyz/dymint/compare/v1.1.0-rc02...v) (2024-04-28) +# [](https://github.com/dymensionxyz/dymint/compare/v1.1.0-rc02...v) (2024-04-30) ### Bug Fixes diff --git a/block/manager.go b/block/manager.go index 5b221b8db..132864498 100644 --- a/block/manager.go +++ b/block/manager.go @@ -229,7 +229,7 @@ func getAddress(key crypto.PrivKey) ([]byte, error) { func (m *Manager) onNodeHealthStatus(event pubsub.Message) { eventData := event.Data().(*events.DataHealthStatus) - m.logger.Info("received health status event", "eventData", eventData) + m.logger.Info("Received node health status event.", "eventData", eventData) m.shouldProduceBlocksCh <- eventData.Error == nil } diff --git a/block/produce.go b/block/produce.go index 5af2f9618..4eab5bf36 100644 --- a/block/produce.go +++ b/block/produce.go @@ -71,7 +71,7 @@ func (m *Manager) ProduceBlockLoop(ctx context.Context) { m.logger.Info("block production paused - awaiting positive continuation signal") shouldProduceBlocks = <-m.shouldProduceBlocksCh } - m.logger.Info("resumed block resumed") + m.logger.Info("resumed block production") } } } diff --git a/da/avail/avail.go b/da/avail/avail.go index 650f180b1..1fc440397 100644 --- a/da/avail/avail.go +++ b/da/avail/avail.go @@ -273,6 +273,8 @@ func (c *DataAvailabilityLayerClient) submitBatchLoop(dataBlob []byte) da.Result retry.Attempts(c.batchRetryAttempts), ) if err != nil { + err = fmt.Errorf("broadcast data blob: %w", err) + if !retry.IsRecoverable(err) { return da.ResultSubmitBatch{ BaseResult: da.BaseResult{ @@ -282,16 +284,16 @@ func (c *DataAvailabilityLayerClient) submitBatchLoop(dataBlob []byte) da.Result }, } } - err = fmt.Errorf("broadcast data blob: %w", err) - c.logger.Error("broadcasting batch, emitting DA unhealthy event and trying again", "error", err) + res, err := da.SubmitBatchHealthEventHelper(c.pubsubServer, c.ctx, err) if err != nil { return res } + c.logger.Error("Submitted bad health event: trying again.", "error", err) continue } - c.logger.Debug("Successfully submitted DA batch") + c.logger.Debug("Successfully submitted batch.") res, err := da.SubmitBatchHealthEventHelper(c.pubsubServer, c.ctx, nil) if err != nil { return res diff --git a/da/celestia/celestia.go b/da/celestia/celestia.go index af42cc68a..48805a203 100644 --- a/da/celestia/celestia.go +++ b/da/celestia/celestia.go @@ -7,6 +7,8 @@ import ( "fmt" "time" + "github.com/dymensionxyz/dymint/gerr" + "github.com/avast/retry-go/v4" "github.com/celestiaorg/nmt" "github.com/gogo/protobuf/proto" @@ -23,6 +25,7 @@ import ( "github.com/dymensionxyz/dymint/store" "github.com/dymensionxyz/dymint/types" pb "github.com/dymensionxyz/dymint/types/pb/dymint" + uretry "github.com/dymensionxyz/dymint/utils/retry" ) // DataAvailabilityLayerClient use celestia-node public API. @@ -36,7 +39,7 @@ type DataAvailabilityLayerClient struct { cancel context.CancelFunc rpcRetryDelay time.Duration rpcRetryAttempts int - submitRetryDelay time.Duration + submitBackoff uretry.BackoffConfig } var ( @@ -65,10 +68,10 @@ func WithRPCAttempts(attempts int) da.Option { } } -// WithSubmitRetryDelay sets submit retry delay. -func WithSubmitRetryDelay(delay time.Duration) da.Option { +// WithSubmitBackoff sets submit retry delay config. +func WithSubmitBackoff(c uretry.BackoffConfig) da.Option { return func(daLayerClient da.DataAvailabilityLayerClient) { - daLayerClient.(*DataAvailabilityLayerClient).submitRetryDelay = delay + daLayerClient.(*DataAvailabilityLayerClient).submitBackoff = c } } @@ -104,7 +107,7 @@ func (c *DataAvailabilityLayerClient) Init(config []byte, pubsubServer *pubsub.S // Set defaults c.rpcRetryAttempts = defaultRpcCheckAttempts c.rpcRetryDelay = defaultRpcRetryDelay - c.submitRetryDelay = defaultSubmitRetryDelay + c.submitBackoff = defaultSubmitBackoff c.ctx, c.cancel = context.WithCancel(context.Background()) @@ -199,16 +202,18 @@ func (c *DataAvailabilityLayerClient) SubmitBatch(batch *types.Batch) da.ResultS return da.ResultSubmitBatch{ BaseResult: da.BaseResult{ Code: da.StatusError, - Message: fmt.Sprintf("size bigger than maximum blob size of %d bytes", celtypes.DefaultMaxBytes), + Message: fmt.Sprintf("size bigger than maximum blob size: max n bytes: %d", celtypes.DefaultMaxBytes), Error: errors.New("blob size too big"), }, } } + backoff := c.submitBackoff.Backoff() + for { select { case <-c.ctx.Done(): - c.logger.Debug("Context cancelled") + c.logger.Debug("Context cancelled.") return da.ResultSubmitBatch{} default: @@ -216,12 +221,14 @@ func (c *DataAvailabilityLayerClient) SubmitBatch(batch *types.Batch) da.ResultS height, commitment, err := c.submit(data) if err != nil { err = fmt.Errorf("submit batch: %w", err) - c.logger.Error("submit DA batch. Emitting health event and trying again", "error", err) + res, err := da.SubmitBatchHealthEventHelper(c.pubsubServer, c.ctx, err) if err != nil { return res } - time.Sleep(c.submitRetryDelay) + + c.logger.Error("Submitted bad health event: trying again.", "error", err) + backoff.Sleep() continue } @@ -236,13 +243,15 @@ func (c *DataAvailabilityLayerClient) SubmitBatch(batch *types.Batch) da.ResultS result := c.CheckBatchAvailability(daMetaData) if result.Code != da.StatusSuccess { - err = fmt.Errorf("submitted batch but did not get availability success: %w", err) - c.logger.Error("unable to confirm submitted blob availability, retrying") + err = fmt.Errorf("check batch availability: submitted batch but did not get availability success status: %w", err) + res, err := da.SubmitBatchHealthEventHelper(c.pubsubServer, c.ctx, err) if err != nil { return res } - time.Sleep(c.submitRetryDelay) + + c.logger.Error("Submitted bad health event: trying again.", "error", err) + backoff.Sleep() continue } daMetaData.Root = result.CheckMetaData.Root @@ -253,6 +262,9 @@ func (c *DataAvailabilityLayerClient) SubmitBatch(batch *types.Batch) da.ResultS if err != nil { return res } + + c.logger.Debug("Batch accepted, emitted healthy event.") + return da.ResultSubmitBatch{ BaseResult: da.BaseResult{ Code: da.StatusSuccess, @@ -533,11 +545,11 @@ func (c *DataAvailabilityLayerClient) checkBatchAvailability(daMetaData *da.DASu func (c *DataAvailabilityLayerClient) submit(daBlob da.Blob) (uint64, da.Commitment, error) { blobs, commitments, err := c.blobsAndCommitments(daBlob) if err != nil { - return 0, nil, err + return 0, nil, fmt.Errorf("blobs and commitments: %w", err) } if len(commitments) == 0 { - return 0, nil, errors.New("no commitments found") + return 0, nil, fmt.Errorf("zero commitments: %w", gerr.ErrNotFound) } options := openrpc.DefaultSubmitOptions() @@ -557,7 +569,7 @@ func (c *DataAvailabilityLayerClient) submit(daBlob da.Blob) (uint64, da.Commitm height, err := c.rpc.Submit(ctx, blobs, options) if err != nil { - return 0, nil, err + return 0, nil, fmt.Errorf("rpc submit: %w", err) } c.logger.Info("Successfully submitted blobs to Celestia", "height", height, "gas", options.GasLimit, "fee", options.Fee) @@ -588,7 +600,7 @@ func (c *DataAvailabilityLayerClient) blobsAndCommitments(daBlob da.Blob) ([]*bl commitment, err := blob.CreateCommitment(b) if err != nil { - return nil, nil, err + return nil, nil, fmt.Errorf("create commitment: %w", err) } commitments = append(commitments, commitment) diff --git a/da/celestia/config.go b/da/celestia/config.go index b7c8aaf76..fbed0d964 100644 --- a/da/celestia/config.go +++ b/da/celestia/config.go @@ -6,18 +6,25 @@ import ( "fmt" "time" + uretry "github.com/dymensionxyz/dymint/utils/retry" + openrpcns "github.com/rollkit/celestia-openrpc/types/namespace" ) const ( defaultRpcRetryDelay = 15 * time.Second - defaultSubmitRetryDelay = 5 * time.Second defaultRpcCheckAttempts = 5 namespaceVersion = 0 defaultGasPrices = 0.1 defaultGasAdjustment float64 = 1.3 ) +var defaultSubmitBackoff = uretry.NewBackoffConfig( + uretry.WithInitialDelay(time.Second*4), + uretry.WithMaxDelay(time.Second*30), + uretry.WithGrowthFactor(1.6), +) + // Config stores Celestia DALC configuration parameters. type Config struct { BaseURL string `json:"base_url"` diff --git a/da/celestia/rpc_test.go b/da/celestia/rpc_test.go index 4126ca8fd..af33cdab7 100644 --- a/da/celestia/rpc_test.go +++ b/da/celestia/rpc_test.go @@ -10,6 +10,8 @@ import ( "testing" "time" + uretry "github.com/dymensionxyz/dymint/utils/retry" + "github.com/dymensionxyz/dymint/da" "github.com/dymensionxyz/dymint/da/celestia" mocks "github.com/dymensionxyz/dymint/mocks/da/celestia" @@ -107,7 +109,7 @@ func TestSubmitBatch(t *testing.T) { mockRPCClient := mocks.NewCelestiaRPCClient(t) // Configure DALC options options := []da.Option{ - celestia.WithSubmitRetryDelay(10 * time.Millisecond), + celestia.WithSubmitBackoff(uretry.NewBackoffConfig(uretry.WithInitialDelay(10 * time.Millisecond))), celestia.WithRPCClient(mockRPCClient), } // Subscribe to the health status event diff --git a/node/events/node.go b/node/events/types.go similarity index 73% rename from node/events/node.go rename to node/events/types.go index 4f77fe6d0..3af471f10 100644 --- a/node/events/node.go +++ b/node/events/types.go @@ -1,6 +1,10 @@ package events -import uevent "github.com/dymensionxyz/dymint/utils/event" +import ( + "fmt" + + uevent "github.com/dymensionxyz/dymint/utils/event" +) // Type Keys const ( @@ -23,6 +27,10 @@ type DataHealthStatus struct { Error error } +func (dhs DataHealthStatus) String() string { + return fmt.Sprintf("DataHealthStatus{Error: %v}", dhs.Error) +} + // Queries var QueryHealthStatus = uevent.QueryFor(NodeTypeKey, HealthStatus) diff --git a/node/events/types_test.go b/node/events/types_test.go new file mode 100644 index 000000000..5a7678f57 --- /dev/null +++ b/node/events/types_test.go @@ -0,0 +1,26 @@ +package events + +import ( + "errors" + "fmt" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestStringer(t *testing.T) { + // Double checking :) + + t.Run("health: nil err", func(t *testing.T) { + hs := DataHealthStatus{} + s := fmt.Sprint(hs) + require.Contains(t, s, "nil") + t.Logf(s) + }) + t.Run("health: some err", func(t *testing.T) { + text := "oops" + hs := DataHealthStatus{Error: errors.New(text)} + s := fmt.Sprint(hs) + require.Contains(t, s, text) + }) +} diff --git a/node/node.go b/node/node.go index 751a054bf..1e6a8d274 100644 --- a/node/node.go +++ b/node/node.go @@ -430,7 +430,7 @@ func (n *Node) onBaseLayerHealthUpdate(event pubsub.Message) { if shouldPublish { evt := &events.DataHealthStatus{Error: newStatus} if newStatus != nil { - n.Logger.Error("node is unhealthy: base layer has problem", "error", newStatus) + n.Logger.Error("Node is unhealthy: base layer has problem.", "error", newStatus) } uevent.MustPublish(n.Ctx, n.PubsubServer, evt, events.HealthStatusList) } diff --git a/settlement/dymension/dymension.go b/settlement/dymension/dymension.go index 3fc2919a9..a342c98a3 100644 --- a/settlement/dymension/dymension.go +++ b/settlement/dymension/dymension.go @@ -234,8 +234,7 @@ func (d *HubClient) PostBatch(batch *types.Batch, daClient da.Client, daResult * uevent.MustPublish(d.ctx, d.pubsub, &settlement.EventDataHealth{Error: err}, settlement.EventHealthStatusList) d.logger.Error( - "submit batch to settlement layer, emitted unhealthy event", - + "Submitted bad health event: trying again.", "startHeight", batch.StartHeight, "endHeight", @@ -257,13 +256,16 @@ func (d *HubClient) PostBatch(batch *types.Batch, daClient da.Client, daResult * select { case <-d.ctx.Done(): return d.ctx.Err() + case <-subscription.Cancelled(): - return fmt.Errorf("subscription canceled: %w", err) + return fmt.Errorf("subscription cancelled: %w", err) + case <-subscription.Out(): uevent.MustPublish(d.ctx, d.pubsub, &settlement.EventDataHealth{}, settlement.EventHealthStatusList) - d.logger.Debug("batch accepted by settlement layer, emitted healthy event", - "startHeight", batch.StartHeight, "endHeight", batch.EndHeight) + d.logger.Debug("Batch accepted: emitted healthy event.", "startHeight", batch.StartHeight, "endHeight", batch.EndHeight) + return nil + case <-timer.C: // Before emitting unhealthy event, check if the batch was accepted by the settlement // layer, and we've just missed the event. @@ -275,7 +277,7 @@ func (d *HubClient) PostBatch(batch *types.Batch, daClient da.Client, daResult * uevent.MustPublish(d.ctx, d.pubsub, &settlement.EventDataHealth{Error: err}, settlement.EventHealthStatusList) d.logger.Error( - "batch not accepted by settlement layer, emitted unhealthy event", + "Submitted bad health event: trying again.", "startHeight", batch.StartHeight, "endHeight", @@ -284,15 +286,14 @@ func (d *HubClient) PostBatch(batch *types.Batch, daClient da.Client, daResult * err, ) - // restart the loop - timer.Stop() + timer.Stop() // we don't forget to clean up continue } // all good - d.logger.Info("batch accepted by settlement layer", "startHeight", includedBatch.StartHeight, "endHeight", includedBatch.EndHeight) - uevent.MustPublish(d.ctx, d.pubsub, &settlement.EventDataHealth{}, settlement.EventHealthStatusList) + d.logger.Info("Batch accepted, emitted healthy event.", "startHeight", includedBatch.StartHeight, "endHeight", includedBatch.EndHeight) + return nil } } diff --git a/utils/retry/backoff.go b/utils/retry/backoff.go new file mode 100644 index 000000000..a416e3ecc --- /dev/null +++ b/utils/retry/backoff.go @@ -0,0 +1,84 @@ +package retry + +import ( + "time" +) + +const ( + defaultBackoffInitialDelay = 200 * time.Millisecond + defaultBackoffMaxDelay = 30 * time.Second + defaultBackoffFactor = 2 +) + +// BackoffConfig is a configuration for a backoff, it's used to create new instances +type BackoffConfig struct { + InitialDelay time.Duration + MaxDelay time.Duration + GrowthFactor float64 +} + +// Backoff creates a new Backoff instance with the configuration (starting at 0 attempts made so far) +func (c BackoffConfig) Backoff() Backoff { + return Backoff{ + delay: c.InitialDelay, + maxDelay: c.MaxDelay, + growthFactor: c.GrowthFactor, + } +} + +type Backoff struct { + delay time.Duration + maxDelay time.Duration + growthFactor float64 +} + +type BackoffOption func(*BackoffConfig) + +func WithInitialDelay(d time.Duration) BackoffOption { + return func(b *BackoffConfig) { + b.InitialDelay = d + } +} + +// WithMaxDelay sets the maximum delay for the backoff. The delay will not exceed this value. +// Set 0 to disable the maximum delay. +func WithMaxDelay(d time.Duration) BackoffOption { + return func(b *BackoffConfig) { + b.MaxDelay = d + } +} + +// WithGrowthFactor sets the growth factor for the backoff. The delay will be multiplied by this factor on each call to Delay. +// The factor should be greater than 1.0 +func WithGrowthFactor(x float64) BackoffOption { + return func(b *BackoffConfig) { + b.GrowthFactor = x + } +} + +func NewBackoffConfig(opts ...BackoffOption) BackoffConfig { + ret := BackoffConfig{ + InitialDelay: defaultBackoffInitialDelay, + MaxDelay: defaultBackoffMaxDelay, + GrowthFactor: defaultBackoffFactor, + } + for _, o := range opts { + o(&ret) + } + return ret +} + +// Delay returns the current delay. The subsequent delay will be increased by the growth factor up to the maximum. +func (b *Backoff) Delay() time.Duration { + ret := b.delay + b.delay = time.Duration(float64(b.delay) * b.growthFactor) + if b.maxDelay != 0 { + b.delay = min(b.delay, b.maxDelay) + } + return ret +} + +// Sleep sleeps for the current delay. The subsequent delay will be increased by the growth factor up to the maximum. +func (b *Backoff) Sleep() { + time.Sleep(b.Delay()) +} diff --git a/utils/retry/backoff_test.go b/utils/retry/backoff_test.go new file mode 100644 index 000000000..37c732f36 --- /dev/null +++ b/utils/retry/backoff_test.go @@ -0,0 +1,37 @@ +package retry + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestBackoff(t *testing.T) { + t.Run("default", func(t *testing.T) { + b := NewBackoffConfig().Backoff() + last := time.Duration(0) + for range 9 { // growing + d := b.Delay() + require.Less(t, last, d) + last = d + } + for range 4 { // maxed out + d := b.Delay() + require.Equal(t, last, d) + last = d + } + }) + t.Run("decimal growth factor", func(t *testing.T) { + initial := time.Second + factor := 1.5 + b := NewBackoffConfig(WithInitialDelay(initial), WithGrowthFactor(factor), WithMaxDelay(0)).Backoff() + b.Delay() // skip first so that last is initial + last := initial + for range 10 { + d := b.Delay() + require.Equal(t, time.Duration(float64(last)*factor), d) + last = d + } + }) +} diff --git a/utils/retry/doc.go b/utils/retry/doc.go new file mode 100644 index 000000000..fe69a7266 --- /dev/null +++ b/utils/retry/doc.go @@ -0,0 +1,4 @@ +// Package retry shall be used alongside "github.com/avast/retry-go/v4" for simple retry patterns +// which the avast package makes difficult. +// Methods in here should be simple and not warrant another dependency. +package retry