Skip to content

Commit

Permalink
fix(DA): use expo backoff in retries (#739)
Browse files Browse the repository at this point in the history
Co-authored-by: github-actions <[email protected]>
  • Loading branch information
2 people authored and omritoptix committed Apr 30, 2024
1 parent 89a38c6 commit 8635252
Show file tree
Hide file tree
Showing 14 changed files with 219 additions and 36 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# [](https://github.com/dymensionxyz/dymint/compare/v1.1.0-rc02...v) (2024-04-28)
# [](https://github.com/dymensionxyz/dymint/compare/v1.1.0-rc02...v) (2024-04-30)


### Bug Fixes
Expand Down
2 changes: 1 addition & 1 deletion block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func getAddress(key crypto.PrivKey) ([]byte, error) {

func (m *Manager) onNodeHealthStatus(event pubsub.Message) {
eventData := event.Data().(*events.DataHealthStatus)
m.logger.Info("received health status event", "eventData", eventData)
m.logger.Info("Received node health status event.", "eventData", eventData)
m.shouldProduceBlocksCh <- eventData.Error == nil
}

Expand Down
2 changes: 1 addition & 1 deletion block/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (m *Manager) ProduceBlockLoop(ctx context.Context) {
m.logger.Info("block production paused - awaiting positive continuation signal")
shouldProduceBlocks = <-m.shouldProduceBlocksCh
}
m.logger.Info("resumed block resumed")
m.logger.Info("resumed block production")
}
}
}
Expand Down
8 changes: 5 additions & 3 deletions da/avail/avail.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,8 @@ func (c *DataAvailabilityLayerClient) submitBatchLoop(dataBlob []byte) da.Result
retry.Attempts(c.batchRetryAttempts),
)
if err != nil {
err = fmt.Errorf("broadcast data blob: %w", err)

if !retry.IsRecoverable(err) {
return da.ResultSubmitBatch{
BaseResult: da.BaseResult{
Expand All @@ -282,16 +284,16 @@ func (c *DataAvailabilityLayerClient) submitBatchLoop(dataBlob []byte) da.Result
},
}
}
err = fmt.Errorf("broadcast data blob: %w", err)
c.logger.Error("broadcasting batch, emitting DA unhealthy event and trying again", "error", err)

res, err := da.SubmitBatchHealthEventHelper(c.pubsubServer, c.ctx, err)
if err != nil {
return res
}
c.logger.Error("Submitted bad health event: trying again.", "error", err)
continue
}

c.logger.Debug("Successfully submitted DA batch")
c.logger.Debug("Successfully submitted batch.")
res, err := da.SubmitBatchHealthEventHelper(c.pubsubServer, c.ctx, nil)
if err != nil {
return res
Expand Down
44 changes: 28 additions & 16 deletions da/celestia/celestia.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"fmt"
"time"

"github.com/dymensionxyz/dymint/gerr"

"github.com/avast/retry-go/v4"
"github.com/celestiaorg/nmt"
"github.com/gogo/protobuf/proto"
Expand All @@ -23,6 +25,7 @@ import (
"github.com/dymensionxyz/dymint/store"
"github.com/dymensionxyz/dymint/types"
pb "github.com/dymensionxyz/dymint/types/pb/dymint"
uretry "github.com/dymensionxyz/dymint/utils/retry"
)

// DataAvailabilityLayerClient use celestia-node public API.
Expand All @@ -36,7 +39,7 @@ type DataAvailabilityLayerClient struct {
cancel context.CancelFunc
rpcRetryDelay time.Duration
rpcRetryAttempts int
submitRetryDelay time.Duration
submitBackoff uretry.BackoffConfig
}

var (
Expand Down Expand Up @@ -65,10 +68,10 @@ func WithRPCAttempts(attempts int) da.Option {
}
}

// WithSubmitRetryDelay sets submit retry delay.
func WithSubmitRetryDelay(delay time.Duration) da.Option {
// WithSubmitBackoff sets submit retry delay config.
func WithSubmitBackoff(c uretry.BackoffConfig) da.Option {
return func(daLayerClient da.DataAvailabilityLayerClient) {
daLayerClient.(*DataAvailabilityLayerClient).submitRetryDelay = delay
daLayerClient.(*DataAvailabilityLayerClient).submitBackoff = c
}
}

Expand Down Expand Up @@ -104,7 +107,7 @@ func (c *DataAvailabilityLayerClient) Init(config []byte, pubsubServer *pubsub.S
// Set defaults
c.rpcRetryAttempts = defaultRpcCheckAttempts
c.rpcRetryDelay = defaultRpcRetryDelay
c.submitRetryDelay = defaultSubmitRetryDelay
c.submitBackoff = defaultSubmitBackoff

c.ctx, c.cancel = context.WithCancel(context.Background())

Expand Down Expand Up @@ -199,29 +202,33 @@ func (c *DataAvailabilityLayerClient) SubmitBatch(batch *types.Batch) da.ResultS
return da.ResultSubmitBatch{
BaseResult: da.BaseResult{
Code: da.StatusError,
Message: fmt.Sprintf("size bigger than maximum blob size of %d bytes", celtypes.DefaultMaxBytes),
Message: fmt.Sprintf("size bigger than maximum blob size: max n bytes: %d", celtypes.DefaultMaxBytes),
Error: errors.New("blob size too big"),
},
}
}

backoff := c.submitBackoff.Backoff()

for {
select {
case <-c.ctx.Done():
c.logger.Debug("Context cancelled")
c.logger.Debug("Context cancelled.")
return da.ResultSubmitBatch{}
default:

// TODO(srene): Split batch in multiple blobs if necessary if supported
height, commitment, err := c.submit(data)
if err != nil {
err = fmt.Errorf("submit batch: %w", err)
c.logger.Error("submit DA batch. Emitting health event and trying again", "error", err)

res, err := da.SubmitBatchHealthEventHelper(c.pubsubServer, c.ctx, err)
if err != nil {
return res
}
time.Sleep(c.submitRetryDelay)

c.logger.Error("Submitted bad health event: trying again.", "error", err)
backoff.Sleep()
continue
}

Expand All @@ -236,13 +243,15 @@ func (c *DataAvailabilityLayerClient) SubmitBatch(batch *types.Batch) da.ResultS

result := c.CheckBatchAvailability(daMetaData)
if result.Code != da.StatusSuccess {
err = fmt.Errorf("submitted batch but did not get availability success: %w", err)
c.logger.Error("unable to confirm submitted blob availability, retrying")
err = fmt.Errorf("check batch availability: submitted batch but did not get availability success status: %w", err)

res, err := da.SubmitBatchHealthEventHelper(c.pubsubServer, c.ctx, err)
if err != nil {
return res
}
time.Sleep(c.submitRetryDelay)

c.logger.Error("Submitted bad health event: trying again.", "error", err)
backoff.Sleep()
continue
}
daMetaData.Root = result.CheckMetaData.Root
Expand All @@ -253,6 +262,9 @@ func (c *DataAvailabilityLayerClient) SubmitBatch(batch *types.Batch) da.ResultS
if err != nil {
return res
}

c.logger.Debug("Batch accepted, emitted healthy event.")

return da.ResultSubmitBatch{
BaseResult: da.BaseResult{
Code: da.StatusSuccess,
Expand Down Expand Up @@ -533,11 +545,11 @@ func (c *DataAvailabilityLayerClient) checkBatchAvailability(daMetaData *da.DASu
func (c *DataAvailabilityLayerClient) submit(daBlob da.Blob) (uint64, da.Commitment, error) {
blobs, commitments, err := c.blobsAndCommitments(daBlob)
if err != nil {
return 0, nil, err
return 0, nil, fmt.Errorf("blobs and commitments: %w", err)
}

if len(commitments) == 0 {
return 0, nil, errors.New("no commitments found")
return 0, nil, fmt.Errorf("zero commitments: %w", gerr.ErrNotFound)
}

options := openrpc.DefaultSubmitOptions()
Expand All @@ -557,7 +569,7 @@ func (c *DataAvailabilityLayerClient) submit(daBlob da.Blob) (uint64, da.Commitm

height, err := c.rpc.Submit(ctx, blobs, options)
if err != nil {
return 0, nil, err
return 0, nil, fmt.Errorf("rpc submit: %w", err)
}
c.logger.Info("Successfully submitted blobs to Celestia", "height", height, "gas", options.GasLimit, "fee", options.Fee)

Expand Down Expand Up @@ -588,7 +600,7 @@ func (c *DataAvailabilityLayerClient) blobsAndCommitments(daBlob da.Blob) ([]*bl

commitment, err := blob.CreateCommitment(b)
if err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("create commitment: %w", err)
}

commitments = append(commitments, commitment)
Expand Down
9 changes: 8 additions & 1 deletion da/celestia/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,25 @@ import (
"fmt"
"time"

uretry "github.com/dymensionxyz/dymint/utils/retry"

openrpcns "github.com/rollkit/celestia-openrpc/types/namespace"
)

const (
defaultRpcRetryDelay = 15 * time.Second
defaultSubmitRetryDelay = 5 * time.Second
defaultRpcCheckAttempts = 5
namespaceVersion = 0
defaultGasPrices = 0.1
defaultGasAdjustment float64 = 1.3
)

var defaultSubmitBackoff = uretry.NewBackoffConfig(
uretry.WithInitialDelay(time.Second*4),
uretry.WithMaxDelay(time.Second*30),
uretry.WithGrowthFactor(1.6),
)

// Config stores Celestia DALC configuration parameters.
type Config struct {
BaseURL string `json:"base_url"`
Expand Down
4 changes: 3 additions & 1 deletion da/celestia/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"testing"
"time"

uretry "github.com/dymensionxyz/dymint/utils/retry"

"github.com/dymensionxyz/dymint/da"
"github.com/dymensionxyz/dymint/da/celestia"
mocks "github.com/dymensionxyz/dymint/mocks/da/celestia"
Expand Down Expand Up @@ -107,7 +109,7 @@ func TestSubmitBatch(t *testing.T) {
mockRPCClient := mocks.NewCelestiaRPCClient(t)
// Configure DALC options
options := []da.Option{
celestia.WithSubmitRetryDelay(10 * time.Millisecond),
celestia.WithSubmitBackoff(uretry.NewBackoffConfig(uretry.WithInitialDelay(10 * time.Millisecond))),
celestia.WithRPCClient(mockRPCClient),
}
// Subscribe to the health status event
Expand Down
10 changes: 9 additions & 1 deletion node/events/node.go → node/events/types.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package events

import uevent "github.com/dymensionxyz/dymint/utils/event"
import (
"fmt"

uevent "github.com/dymensionxyz/dymint/utils/event"
)

// Type Keys
const (
Expand All @@ -23,6 +27,10 @@ type DataHealthStatus struct {
Error error
}

func (dhs DataHealthStatus) String() string {
return fmt.Sprintf("DataHealthStatus{Error: %v}", dhs.Error)
}

// Queries

var QueryHealthStatus = uevent.QueryFor(NodeTypeKey, HealthStatus)
26 changes: 26 additions & 0 deletions node/events/types_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package events

import (
"errors"
"fmt"
"testing"

"github.com/stretchr/testify/require"
)

func TestStringer(t *testing.T) {
// Double checking :)

t.Run("health: nil err", func(t *testing.T) {
hs := DataHealthStatus{}
s := fmt.Sprint(hs)
require.Contains(t, s, "nil")
t.Logf(s)
})
t.Run("health: some err", func(t *testing.T) {
text := "oops"
hs := DataHealthStatus{Error: errors.New(text)}
s := fmt.Sprint(hs)
require.Contains(t, s, text)
})
}
2 changes: 1 addition & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ func (n *Node) onBaseLayerHealthUpdate(event pubsub.Message) {
if shouldPublish {
evt := &events.DataHealthStatus{Error: newStatus}
if newStatus != nil {
n.Logger.Error("node is unhealthy: base layer has problem", "error", newStatus)
n.Logger.Error("Node is unhealthy: base layer has problem.", "error", newStatus)
}
uevent.MustPublish(n.Ctx, n.PubsubServer, evt, events.HealthStatusList)
}
Expand Down
21 changes: 11 additions & 10 deletions settlement/dymension/dymension.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,7 @@ func (d *HubClient) PostBatch(batch *types.Batch, daClient da.Client, daResult *
uevent.MustPublish(d.ctx, d.pubsub, &settlement.EventDataHealth{Error: err}, settlement.EventHealthStatusList)

d.logger.Error(
"submit batch to settlement layer, emitted unhealthy event",

"Submitted bad health event: trying again.",
"startHeight",
batch.StartHeight,
"endHeight",
Expand All @@ -257,13 +256,16 @@ func (d *HubClient) PostBatch(batch *types.Batch, daClient da.Client, daResult *
select {
case <-d.ctx.Done():
return d.ctx.Err()

case <-subscription.Cancelled():
return fmt.Errorf("subscription canceled: %w", err)
return fmt.Errorf("subscription cancelled: %w", err)

case <-subscription.Out():
uevent.MustPublish(d.ctx, d.pubsub, &settlement.EventDataHealth{}, settlement.EventHealthStatusList)
d.logger.Debug("batch accepted by settlement layer, emitted healthy event",
"startHeight", batch.StartHeight, "endHeight", batch.EndHeight)
d.logger.Debug("Batch accepted: emitted healthy event.", "startHeight", batch.StartHeight, "endHeight", batch.EndHeight)

return nil

case <-timer.C:
// Before emitting unhealthy event, check if the batch was accepted by the settlement
// layer, and we've just missed the event.
Expand All @@ -275,7 +277,7 @@ func (d *HubClient) PostBatch(batch *types.Batch, daClient da.Client, daResult *
uevent.MustPublish(d.ctx, d.pubsub, &settlement.EventDataHealth{Error: err}, settlement.EventHealthStatusList)

d.logger.Error(
"batch not accepted by settlement layer, emitted unhealthy event",
"Submitted bad health event: trying again.",
"startHeight",
batch.StartHeight,
"endHeight",
Expand All @@ -284,15 +286,14 @@ func (d *HubClient) PostBatch(batch *types.Batch, daClient da.Client, daResult *
err,
)

// restart the loop
timer.Stop()
timer.Stop() // we don't forget to clean up
continue
}

// all good
d.logger.Info("batch accepted by settlement layer", "startHeight", includedBatch.StartHeight, "endHeight", includedBatch.EndHeight)

uevent.MustPublish(d.ctx, d.pubsub, &settlement.EventDataHealth{}, settlement.EventHealthStatusList)
d.logger.Info("Batch accepted, emitted healthy event.", "startHeight", includedBatch.StartHeight, "endHeight", includedBatch.EndHeight)

return nil
}
}
Expand Down
Loading

0 comments on commit 8635252

Please sign in to comment.