Skip to content

Commit

Permalink
feat(da): enable da rotation (#1368)
Browse files Browse the repository at this point in the history
Co-authored-by: Michael Tsitrin <[email protected]>
Co-authored-by: Michael Tsitrin <[email protected]>
  • Loading branch information
3 people authored Feb 13, 2025
1 parent 60b92da commit fdd0316
Show file tree
Hide file tree
Showing 35 changed files with 482 additions and 213 deletions.
2 changes: 1 addition & 1 deletion block/balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (m *Manager) checkBalances() (*Balances, error) {

go func() {
defer wg.Done()
balance, err := m.DAClient.GetSignerBalance()
balance, err := m.GetActiveDAClient().GetSignerBalance()
if err != nil {
errDA = fmt.Errorf("get DA signer balance: %w", err)
return
Expand Down
5 changes: 5 additions & 0 deletions block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta
return fmt.Errorf("save block source: %w", err)
}

_, err = m.Store.SaveDA(block.Header.Height, responses.EndBlock.RollappParamUpdates.Da, nil)
if err != nil {
return fmt.Errorf("save DA: %w", err)
}

_, err = m.Store.SaveDRSVersion(block.Header.Height, responses.EndBlock.RollappParamUpdates.DrsVersion, nil)
if err != nil {
return fmt.Errorf("add drs version: %w", err)
Expand Down
17 changes: 7 additions & 10 deletions block/fraud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ func TestP2PBlockWithFraud(t *testing.T) {
manager, err := testutil.GetManager(testutil.GetManagerConfig(), nil, 1, 1, 0, proxyApp, nil)
require.NoError(t, err)
require.NotNil(t, manager)
manager.DAClient = testutil.GetMockDALC(log.TestingLogger())
manager.Retriever = manager.DAClient.(da.BatchRetriever)
manager.DAClients[da.Mock] = testutil.GetMockDALC(log.TestingLogger())

// mock executor that returns ErrFault on ExecuteBlock
mockExecutor := &blockmocks.MockExecutorI{}
Expand Down Expand Up @@ -163,8 +162,7 @@ func TestLocalBlockWithFraud(t *testing.T) {
manager, err := testutil.GetManager(testutil.GetManagerConfig(), nil, 1, 1, 0, proxyApp, nil)
require.NoError(t, err)
require.NotNil(t, manager)
manager.DAClient = testutil.GetMockDALC(log.TestingLogger())
manager.Retriever = manager.DAClient.(da.BatchRetriever)
manager.DAClients[da.Mock] = testutil.GetMockDALC(log.TestingLogger())

numBatchesToAdd := 2
nextBatchStartHeight := manager.NextHeightToSubmit()
Expand All @@ -182,10 +180,10 @@ func TestLocalBlockWithFraud(t *testing.T) {
_, err = manager.Store.SaveBlock(batch.Blocks[0], batch.Commits[0], nil)
require.NoError(t, err)

daResultSubmitBatch := manager.DAClient.SubmitBatch(batch)
daResultSubmitBatch := manager.GetActiveDAClient().SubmitBatch(batch)
assert.Equal(t, daResultSubmitBatch.Code, da.StatusSuccess)

err = manager.SLClient.SubmitBatch(batch, manager.DAClient.GetClientType(), &daResultSubmitBatch)
err = manager.SLClient.SubmitBatch(batch, manager.GetActiveDAClient().GetClientType(), &daResultSubmitBatch)
require.NoError(t, err)

nextBatchStartHeight = batch.EndHeight() + 1
Expand Down Expand Up @@ -280,8 +278,7 @@ func TestApplyBatchFromSLWithFraud(t *testing.T) {
manager, err := testutil.GetManager(testutil.GetManagerConfig(), nil, 1, 1, 0, proxyApp, mockStore)
require.NoError(err)
commitHash := [32]byte{1}
manager.DAClient = testutil.GetMockDALC(log.TestingLogger())
manager.Retriever = manager.DAClient.(da.BatchRetriever)
manager.DAClients[da.Mock] = testutil.GetMockDALC(log.TestingLogger())
app.On("Commit", mock.Anything).Return(abci.ResponseCommit{Data: commitHash[:]})

// submit batch
Expand All @@ -293,9 +290,9 @@ func TestApplyBatchFromSLWithFraud(t *testing.T) {
[32]byte{},
)
require.NoError(err)
daResultSubmitBatch := manager.DAClient.SubmitBatch(batch)
daResultSubmitBatch := manager.GetActiveDAClient().SubmitBatch(batch)
require.Equal(daResultSubmitBatch.Code, da.StatusSuccess)
err = manager.SLClient.SubmitBatch(batch, manager.DAClient.GetClientType(), &daResultSubmitBatch)
err = manager.SLClient.SubmitBatch(batch, manager.GetActiveDAClient().GetClientType(), &daResultSubmitBatch)
require.NoError(err)

// Mock Executor to return ErrFraud
Expand Down
64 changes: 22 additions & 42 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/dymensionxyz/gerr-cosmos/gerrc"
"golang.org/x/sync/errgroup"

"github.com/dymensionxyz/dymint/da/registry"
"github.com/dymensionxyz/dymint/indexers/txindex"
"github.com/dymensionxyz/dymint/node/events"
"github.com/dymensionxyz/dymint/store"
Expand Down Expand Up @@ -62,7 +61,7 @@ type Manager struct {
// Clients and servers
Pubsub *pubsub.Server
P2PClient *p2p.Client
DAClient da.DataAvailabilityLayerClient
DAClients map[da.Client]da.DataAvailabilityLayerClient
SLClient settlement.ClientI

// RunMode represents the mode of the node. Set during initialization and shouldn't change after that.
Expand All @@ -89,9 +88,6 @@ type Manager struct {
// indexer
IndexerService *txindex.IndexerService

// used to fetch blocks from DA. Sequencer will only fetch batches in case it requires to re-sync (in case of rollback). Full-node will fetch batches for syncing and validation.
Retriever da.BatchRetriever

/*
Full-node only
*/
Expand Down Expand Up @@ -132,10 +128,10 @@ func NewManager(
mempool mempool.Mempool,
proxyApp proxy.AppConns,
settlementClient settlement.ClientI,
daClients []da.DataAvailabilityLayerClient,
eventBus *tmtypes.EventBus,
pubsub *pubsub.Server,
p2pClient *p2p.Client,
dalcKV *store.PrefixKV,
indexerService *txindex.IndexerService,
logger log.Logger,
) (*Manager, error) {
Expand Down Expand Up @@ -168,6 +164,7 @@ func NewManager(
Executor: exec,
Sequencers: types.NewSequencerSet(),
SLClient: settlementClient,
DAClients: make(map[da.Client]da.DataAvailabilityLayerClient),
IndexerService: indexerService,
logger: logger.With("module", "block_manager"),
blockCache: &Cache{
Expand All @@ -178,20 +175,14 @@ func NewManager(
settlementValidationC: make(chan struct{}, 1), // use of buffered channel to avoid blocking. In case channel is full, its skipped because there is an ongoing validation process, but validation height is updated, which means the ongoing validation will validate to the new height.
syncedFromSettlement: uchannel.NewNudger(), // used by the sequencer to wait till the node completes the syncing from settlement.
}
err = m.LoadStateOnInit(store, genesis, logger)
if err != nil {
return nil, fmt.Errorf("get initial state: %w", err)
}

err = m.setDA(conf.DAConfig, dalcKV, logger)
if err != nil {
return nil, err
for _, client := range daClients {
m.DAClients[client.GetClientType()] = client
}

// validate configuration params and rollapp consensus params are in line
err = m.ValidateConfigWithRollappParams()
err = m.LoadStateOnInit(store, genesis, logger)
if err != nil {
return nil, err
return nil, fmt.Errorf("get initial state: %w", err)
}

m.SettlementValidator = NewSettlementValidator(m.logger, m)
Expand All @@ -217,8 +208,14 @@ func (m *Manager) Start(ctx context.Context) error {
}
}

// validate configuration params and rollapp consensus params are in line
err := m.ValidateConfigWithRollappParams()
if err != nil {
return err
}

// update dymint state with next revision info
err := m.updateStateForNextRevision()
err = m.updateStateForNextRevision()
if err != nil {
return err
}
Expand Down Expand Up @@ -410,38 +407,17 @@ func (m *Manager) UpdateTargetHeight(h uint64) {

// ValidateConfigWithRollappParams checks the configuration params are consistent with the params in the dymint state (e.g. DA and version)
func (m *Manager) ValidateConfigWithRollappParams() error {
if da.Client(m.State.RollappParams.Da) != m.DAClient.GetClientType() {
return fmt.Errorf("da client mismatch. rollapp param: %s da configured: %s", m.State.RollappParams.Da, m.DAClient.GetClientType())
if m.GetActiveDAClient() == nil {
return fmt.Errorf("missing da layer in config. da: %s", m.State.RollappParams.Da)
}

if m.Conf.BatchSubmitBytes > m.DAClient.GetMaxBlobSizeBytes() {
return fmt.Errorf("batch size above limit: batch size: %d limit: %d: DA %s", m.Conf.BatchSubmitBytes, m.DAClient.GetMaxBlobSizeBytes(), m.DAClient.GetClientType())
if m.Conf.BatchSubmitBytes > m.GetActiveDAClient().GetMaxBlobSizeBytes() {
return fmt.Errorf("batch size above limit: batch size: %d limit: %d: DA %s", m.Conf.BatchSubmitBytes, m.GetActiveDAClient().GetMaxBlobSizeBytes(), m.GetActiveDAClient().GetClientType())
}

return nil
}

// setDA initializes DA client in blockmanager according to DA type set in genesis or stored in state
func (m *Manager) setDA(daconfig string, dalcKV store.KV, logger log.Logger) error {
daLayer := m.State.RollappParams.Da
dalc := registry.GetClient(daLayer)
if dalc == nil {
return fmt.Errorf("get data availability client named '%s'", daLayer)
}

err := dalc.Init([]byte(daconfig), m.Pubsub, dalcKV, logger.With("module", string(dalc.GetClientType())))
if err != nil {
return fmt.Errorf("data availability layer client initialization: %w", err)
}
m.DAClient = dalc
retriever, ok := dalc.(da.BatchRetriever)
if !ok {
return fmt.Errorf("data availability layer client is not of type BatchRetriever")
}
m.Retriever = retriever
return nil
}

// setFraudHandler sets the fraud handler for the block manager.
func (m *Manager) setFraudHandler(handler *FreezeHandler) {
m.FraudHandler = handler
Expand All @@ -460,3 +436,7 @@ func (m *Manager) setUnhealthy(err error) {
func (m *Manager) setHealthy() {
uevent.MustPublish(context.Background(), m.Pubsub, &events.DataHealthStatus{Error: nil}, events.HealthStatusList)
}

func (m *Manager) GetActiveDAClient() da.DataAvailabilityLayerClient {
return m.DAClients[da.Client(m.State.RollappParams.Da)]
}
23 changes: 11 additions & 12 deletions block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ func TestInitialState(t *testing.T) {
settlementlc := slregistry.GetClient(slregistry.Local)
_ = settlementlc.Init(settlement.Config{}, genesis.ChainID, pubsubServer, logger)

daclient := []da.DataAvailabilityLayerClient{testutil.GetMockDALC(logger)}

// Init empty store and full store
emptyStore := store.New(store.NewDefaultInMemoryKVStore())
fullStore := store.New(store.NewDefaultInMemoryKVStore())
Expand Down Expand Up @@ -116,8 +118,8 @@ func TestInitialState(t *testing.T) {

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
agg, err := block.NewManager(key, conf, c.genesis, "", c.store, nil, proxyApp, settlementlc,
nil, pubsubServer, p2pClient, nil, nil, logger)
agg, err := block.NewManager(key, conf, c.genesis, "", c.store, nil, proxyApp, settlementlc, daclient,
nil, pubsubServer, p2pClient, nil, logger)
assert.NoError(err)
assert.NotNil(agg)
assert.Equal(c.expectedChainID, agg.State.ChainID)
Expand Down Expand Up @@ -161,8 +163,7 @@ func TestProduceOnlyAfterSynced(t *testing.T) {
require.NotNil(t, manager)

t.Log("Taking the manager out of sync by submitting a batch")
manager.DAClient = testutil.GetMockDALC(log.TestingLogger())
manager.Retriever = manager.DAClient.(da.BatchRetriever)
manager.DAClients[da.Mock] = testutil.GetMockDALC(log.TestingLogger())

numBatchesToAdd := 2
nextBatchStartHeight := manager.NextHeightToSubmit()
Expand All @@ -174,9 +175,9 @@ func TestProduceOnlyAfterSynced(t *testing.T) {
lastBlockHeaderHash,
)
assert.NoError(t, err)
daResultSubmitBatch := manager.DAClient.SubmitBatch(batch)
daResultSubmitBatch := manager.GetActiveDAClient().SubmitBatch(batch)
assert.Equal(t, daResultSubmitBatch.Code, da.StatusSuccess)
err = manager.SLClient.SubmitBatch(batch, manager.DAClient.GetClientType(), &daResultSubmitBatch)
err = manager.SLClient.SubmitBatch(batch, manager.GetActiveDAClient().GetClientType(), &daResultSubmitBatch)
require.NoError(t, err)
nextBatchStartHeight = batch.EndHeight() + 1
lastBlockHeaderHash = batch.Blocks[len(batch.Blocks)-1].Header.Hash()
Expand Down Expand Up @@ -206,8 +207,7 @@ func TestRetrieveDaBatchesFailed(t *testing.T) {
manager, err := testutil.GetManager(testutil.GetManagerConfig(), nil, 1, 1, 0, nil, nil)
require.NoError(t, err)
require.NotNil(t, manager)
manager.DAClient = testutil.GetMockDALC(log.TestingLogger())
manager.Retriever = manager.DAClient.(da.BatchRetriever)
manager.DAClients[da.Mock] = testutil.GetMockDALC(log.TestingLogger())

submitMetadata := local.SubmitMetaData{
Height: 1,
Expand Down Expand Up @@ -513,8 +513,7 @@ func TestDAFetch(t *testing.T) {
require.NoError(err)
commitHash := [32]byte{}

manager.DAClient = testutil.GetMockDALC(log.TestingLogger())
manager.Retriever = manager.DAClient.(da.BatchRetriever)
manager.DAClients[da.Mock] = testutil.GetMockDALC(log.TestingLogger())

app.On("Commit", mock.Anything).Return(abci.ResponseCommit{Data: commitHash[:]})

Expand All @@ -526,9 +525,9 @@ func TestDAFetch(t *testing.T) {
[32]byte{},
)
require.NoError(err)
daResultSubmitBatch := manager.DAClient.SubmitBatch(batch)
daResultSubmitBatch := manager.GetActiveDAClient().SubmitBatch(batch)
require.Equal(daResultSubmitBatch.Code, da.StatusSuccess)
err = manager.SLClient.SubmitBatch(batch, manager.DAClient.GetClientType(), &daResultSubmitBatch)
err = manager.SLClient.SubmitBatch(batch, manager.GetActiveDAClient().GetClientType(), &daResultSubmitBatch)
require.NoError(err)

cases := []struct {
Expand Down
6 changes: 2 additions & 4 deletions block/production_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,8 +334,7 @@ func TestUpdateInitialSequencerSet(t *testing.T) {
manager, err := testutil.GetManagerWithProposerKey(testutil.GetManagerConfig(), lib2pPrivKey, slmock, 1, 1, 0, proxyApp, nil)
require.NoError(err)

manager.DAClient = testutil.GetMockDALC(log.TestingLogger())
manager.Retriever = manager.DAClient.(da.BatchRetriever)
manager.DAClients[da.Mock] = testutil.GetMockDALC(log.TestingLogger())

// Check initial assertions
require.Zero(manager.State.Height())
Expand Down Expand Up @@ -465,8 +464,7 @@ func TestUpdateExistingSequencerSet(t *testing.T) {
manager, err := testutil.GetManagerWithProposerKey(testutil.GetManagerConfig(), lib2pPrivKey, slmock, 1, 1, 0, proxyApp, nil)
require.NoError(err)

manager.DAClient = testutil.GetMockDALC(log.TestingLogger())
manager.Retriever = manager.DAClient.(da.BatchRetriever)
manager.DAClients[da.Mock] = testutil.GetMockDALC(log.TestingLogger())

// Set the initial sequencer set
manager.Sequencers.Set([]types.Sequencer{proposer, sequencer})
Expand Down
3 changes: 1 addition & 2 deletions block/pruning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ func TestPruningRetainHeight(t *testing.T) {

manager, err := testutil.GetManager(testutil.GetManagerConfig(), nil, 1, 1, 0, proxyApp, nil)
require.NoError(err)
manager.DAClient = testutil.GetMockDALC(log.TestingLogger())
manager.Retriever = manager.DAClient.(da.BatchRetriever)
manager.DAClients[da.Mock] = testutil.GetMockDALC(log.TestingLogger())

// Check initial assertions
require.Zero(manager.State.Height())
Expand Down
11 changes: 8 additions & 3 deletions block/retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,20 +100,25 @@ func (m *Manager) applyLocalBlock() error {
return nil
}

func (m *Manager) GetRetriever(da da.Client) da.BatchRetriever {
return m.DAClients[da]
}

func (m *Manager) fetchBatch(daMetaData *da.DASubmitMetaData) da.ResultRetrieveBatch {
// Check DA client
if daMetaData.Client != m.DAClient.GetClientType() {
retriever := m.DAClients[daMetaData.Client]
if retriever == nil {
return da.ResultRetrieveBatch{
BaseResult: da.BaseResult{
Code: da.StatusError,
Message: fmt.Sprintf("DA client for the batch does not match node config: DA client batch: %s: DA client config: %s", daMetaData.Client, m.DAClient.GetClientType()),
Message: fmt.Sprintf("DA client for the batch is missing in node config: DA client batch: %s", daMetaData.Client),
Error: da.ErrDAMismatch,
},
}
}

// batchRes.MetaData includes proofs necessary to open disputes with the Hub
batchRes := m.Retriever.RetrieveBatches(daMetaData.DAPath)
batchRes := retriever.RetrieveBatches(daMetaData.DAPath)
// TODO(srene) : for invalid transactions there is no specific error code since it will need to be validated somewhere else for fraud proving.
// NMT proofs (availRes.MetaData.Proofs) are included in the result batchRes, necessary to be included in the dispute
return batchRes
Expand Down
16 changes: 14 additions & 2 deletions block/slvalidator.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ func NewSettlementValidator(logger types.Logger, blockManager *Manager) *Settlem
func (v *SettlementValidator) ValidateStateUpdate(batch *settlement.ResultRetrieveBatch) error {
v.logger.Debug("validating state update", "start height", batch.StartHeight, "end height", batch.EndHeight)

daClient, err := v.blockManager.Store.LoadDA(batch.EndHeight)
if err != nil {
return err
}
if daClient != string(batch.MetaData.Client) {
return types.NewErrStateUpdateDAFraud(batch.StateIndex, batch.EndHeight, daClient, string(batch.MetaData.Client))
}

// loads blocks applied from P2P, if any.
p2pBlocks := make(map[uint64]*types.Block)
for height := batch.StartHeight; height <= batch.EndHeight; height++ {
Expand Down Expand Up @@ -82,8 +90,12 @@ func (v *SettlementValidator) ValidateStateUpdate(batch *settlement.ResultRetrie
return types.NewErrStateUpdateBlobCorruptedFraud(batch.StateIndex, string(batch.MetaData.Client), batch.MetaData.DAPath)
}

retriever := v.blockManager.GetRetriever(batch.MetaData.Client)
if retriever == nil {
return fmt.Errorf("missing DA in config. DA: %s", batch.MetaData.Client)
}
// fraud detected in case availability checks fail and therefore there certainty the blob, according to the state update DA path, is not available.
checkBatchResult := v.blockManager.Retriever.CheckBatchAvailability(batch.MetaData.DAPath)
checkBatchResult := retriever.CheckBatchAvailability(batch.MetaData.DAPath)
if errors.Is(checkBatchResult.Error, da.ErrBlobNotIncluded) {
return types.NewErrStateUpdateBlobNotAvailableFraud(batch.StateIndex, string(batch.MetaData.Client), batch.MetaData.DAPath)
}
Expand All @@ -98,7 +110,7 @@ func (v *SettlementValidator) ValidateStateUpdate(batch *settlement.ResultRetrie
}

// validate DA blocks against the state update
err := v.ValidateDaBlocks(batch, daBlocks)
err = v.ValidateDaBlocks(batch, daBlocks)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit fdd0316

Please sign in to comment.