Skip to content

Commit

Permalink
Merge branch 'main' into release/v0.5.x
Browse files Browse the repository at this point in the history
# Conflicts:
#	block/manager.go
ItayLevyOfficial committed Oct 2, 2023
2 parents cecaf89 + b023347 commit 65bc962
Showing 18 changed files with 929 additions and 797 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
@@ -15,7 +15,7 @@ jobs:
GOPRIVATE: "github.com/dymensionxyz/*"
GH_ACCESS_TOKEN: "${{ secrets.GH_ACCESS_TOKEN }}"
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- uses: actions/setup-go@v4
with:
go-version: '1.19'
@@ -46,7 +46,7 @@ jobs:
markdownlint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- name: markdownlint-cli
uses: nosborn/github-action-markdown-cli@v3.3.0
with:
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -14,7 +14,7 @@ jobs:
GOPRIVATE: "github.com/dymensionxyz/*"
GH_ACCESS_TOKEN: "${{ secrets.GH_ACCESS_TOKEN }}"
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4

- name: Set up Go
uses: actions/setup-go@v4
5 changes: 3 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -8,9 +8,10 @@ COMMIT_HASH := $(shell git rev-parse --short HEAD)
LD_FLAGS = -X github.com/dymensionxyz/dymint/version.DymintGitCommitHash=$(COMMIT_HASH)
BUILD_FLAGS = -mod=readonly -ldflags "$(LD_FLAGS)"
CGO_ENABLED ?= 0
VERSION ?= $(shell git describe --tags --always)

LD_FLAGS = -X github.com/dymensionxyz/dymint/version.BuildVersion=$(VERSION)

# allow users to pass additional flags via the conventional LDFLAGS variable
LD_FLAGS += $(LDFLAGS)

# Process Docker environment varible TARGETPLATFORM
# in order to build binary with correspondent ARCH
178 changes: 178 additions & 0 deletions block/block.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package block

import (
"context"

"cosmossdk.io/errors"
"github.com/dymensionxyz/dymint/p2p"
"github.com/dymensionxyz/dymint/types"
tmstate "github.com/tendermint/tendermint/proto/tendermint/state"
tmtypes "github.com/tendermint/tendermint/types"
)

// applyBlock applies the block to the store and the abci app.
// steps: save block -> execute block with app -> update state -> commit block to app -> update store height and state hash.
// As the entire process can't be atomic we need to make sure the following condition apply before
// we're applying the block in the happy path: block height - 1 == abci app last block height.
// In case the following doesn't hold true, it means we crashed after the commit and before updating the store height.
// In that case we'll want to align the store with the app state and continue to the next block.
func (m *Manager) applyBlock(ctx context.Context, block *types.Block, commit *types.Commit, blockMetaData blockMetaData) error {
if block.Header.Height != m.store.Height()+1 {
// We crashed after the commit and before updating the store height.
return nil
}

m.logger.Debug("Applying block", "height", block.Header.Height, "source", blockMetaData.source)

// Check if alignment is needed due to incosistencies between the store and the app.
isAlignRequired, err := m.alignStoreWithApp(ctx, block)
if err != nil {
return err
}
if isAlignRequired {
m.logger.Debug("Aligned with app state required. Skipping to next block", "height", block.Header.Height)
return nil
}
// Start applying the block assuming no inconsistency was found.
_, err = m.store.SaveBlock(block, commit, nil)
if err != nil {
m.logger.Error("Failed to save block", "error", err)
return err
}

responses, err := m.executeBlock(ctx, block, commit)
if err != nil {
m.logger.Error("Failed to execute block", "error", err)
return err
}

newState, err := m.executor.UpdateStateFromResponses(responses, m.lastState, block)
if err != nil {
return err
}

batch := m.store.NewBatch()

batch, err = m.store.SaveBlockResponses(block.Header.Height, responses, batch)
if err != nil {
batch.Discard()
return err
}

m.lastState = newState
batch, err = m.store.UpdateState(m.lastState, batch)
if err != nil {
batch.Discard()
return err
}
batch, err = m.store.SaveValidators(block.Header.Height, m.lastState.Validators, batch)
if err != nil {
batch.Discard()
return err
}

err = batch.Commit()
if err != nil {
m.logger.Error("Failed to persist batch to disk", "error", err)
return err
}

// Commit block to app
retainHeight, err := m.executor.Commit(ctx, &newState, block, responses)
if err != nil {
m.logger.Error("Failed to commit to the block", "error", err)
return err
}

// Prune old heights, if requested by ABCI app.
if retainHeight > 0 {
pruned, err := m.pruneBlocks(retainHeight)
if err != nil {
m.logger.Error("failed to prune blocks", "retain_height", retainHeight, "err", err)
} else {
m.logger.Debug("pruned blocks", "pruned", pruned, "retain_height", retainHeight)
}
}

// Update the state with the new app hash, last validators and store height from the commit.
// Every one of those, if happens before commit, prevents us from re-executing the block in case failed during commit.
newState.LastValidators = m.lastState.Validators.Copy()
newState.LastStoreHeight = block.Header.Height
newState.BaseHeight = m.store.Base()

_, err = m.store.UpdateState(newState, nil)
if err != nil {
m.logger.Error("Failed to update state", "error", err)
return err
}
m.lastState = newState

m.store.SetHeight(block.Header.Height)

return nil
}

// alignStoreWithApp is responsible for aligning the state of the store and the abci app if necessary.
func (m *Manager) alignStoreWithApp(ctx context.Context, block *types.Block) (bool, error) {
isRequired := false
// Validate incosistency in height wasn't caused by a crash and if so handle it.
proxyAppInfo, err := m.executor.GetAppInfo()
if err != nil {
return isRequired, errors.Wrap(err, "failed to get app info")
}
if uint64(proxyAppInfo.LastBlockHeight) != block.Header.Height {
return isRequired, nil
}

isRequired = true
m.logger.Info("Skipping block application and only updating store height and state hash", "height", block.Header.Height)
// update the state with the hash, last store height and last validators.
m.lastState.AppHash = *(*[32]byte)(proxyAppInfo.LastBlockAppHash)
m.lastState.LastStoreHeight = block.Header.Height
m.lastState.LastValidators = m.lastState.Validators.Copy()

resp, err := m.store.LoadBlockResponses(block.Header.Height)
if err != nil {
return isRequired, errors.Wrap(err, "failed to load block responses")
}
copy(m.lastState.LastResultsHash[:], tmtypes.NewResults(resp.DeliverTxs).Hash())

_, err = m.store.UpdateState(m.lastState, nil)
if err != nil {
return isRequired, errors.Wrap(err, "failed to update state")
}
m.store.SetHeight(block.Header.Height)
return isRequired, nil
}

func (m *Manager) executeBlock(ctx context.Context, block *types.Block, commit *types.Commit) (*tmstate.ABCIResponses, error) {
// Currently we're assuming proposer is never nil as it's a pre-condition for
// dymint to start
proposer := m.settlementClient.GetProposer()

if err := m.executor.Validate(m.lastState, block, commit, proposer); err != nil {
return &tmstate.ABCIResponses{}, err
}

responses, err := m.executor.Execute(ctx, m.lastState, block)
if err != nil {
return &tmstate.ABCIResponses{}, err
}

return responses, nil
}

func (m *Manager) gossipBlock(ctx context.Context, block types.Block, commit types.Commit) error {
gossipedBlock := p2p.GossipedBlock{Block: block, Commit: commit}
gossipedBlockBytes, err := gossipedBlock.MarshalBinary()
if err != nil {
m.logger.Error("Failed to marshal block", "error", err)
return err
}
if err := m.p2pClient.GossipBlock(ctx, gossipedBlockBytes); err != nil {
m.logger.Error("Failed to gossip block", "error", err)
return err
}
return nil

}
614 changes: 0 additions & 614 deletions block/manager.go

Large diffs are not rendered by default.

130 changes: 1 addition & 129 deletions block/manager_test.go
Original file line number Diff line number Diff line change
@@ -3,7 +3,6 @@ package block
import (
"context"
"crypto/rand"
"encoding/hex"
"errors"
"sync/atomic"
"testing"
@@ -13,8 +12,6 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/dymensionxyz/dymint/log/test"
mempoolv1 "github.com/dymensionxyz/dymint/mempool/v1"
"github.com/dymensionxyz/dymint/node/events"
"github.com/dymensionxyz/dymint/p2p"
"github.com/dymensionxyz/dymint/settlement"
@@ -23,24 +20,17 @@ import (
"github.com/libp2p/go-libp2p/core/crypto"

abci "github.com/tendermint/tendermint/abci/types"
tmcfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/pubsub"
"github.com/tendermint/tendermint/proxy"
tmtypes "github.com/tendermint/tendermint/types"

"github.com/dymensionxyz/dymint/config"
"github.com/dymensionxyz/dymint/da"
mockda "github.com/dymensionxyz/dymint/da/mock"
nodemempool "github.com/dymensionxyz/dymint/node/mempool"
slregistry "github.com/dymensionxyz/dymint/settlement/registry"
"github.com/dymensionxyz/dymint/store"
)

const (
defaultBatchSize = 5
batchLimitBytes = 2000
)
const (
connectionRefusedErrorMessage = "connection refused"
batchNotFoundErrorMessage = "batch not found"
@@ -67,7 +57,7 @@ func TestInitialState(t *testing.T) {

// Init p2p client
privKey, _, _ := crypto.GenerateEd25519Key(rand.Reader)
p2pClient, err := p2p.NewClient(config.P2PConfig{}, privKey, "TestChain", test.NewLogger(t))
p2pClient, err := p2p.NewClient(config.P2PConfig{}, privKey, "TestChain", logger)
assert.NoError(err)
assert.NotNil(p2pClient)

@@ -470,121 +460,3 @@ func TestCreateNextDABatchWithBytesLimit(t *testing.T) {
})
}
}

/* -------------------------------------------------------------------------- */
/* utils */
/* -------------------------------------------------------------------------- */

func getManager(conf config.BlockManagerConfig, settlementlc settlement.LayerI, dalc da.DataAvailabilityLayerClient, genesisHeight int64, storeInitialHeight int64, storeLastBlockHeight int64, proxyAppConns proxy.AppConns, mockStore store.Store) (*Manager, error) {
genesis := testutil.GenerateGenesis(genesisHeight)
// Change the LastBlockHeight to avoid calling InitChainSync within the manager
// And updating the state according to the genesis.
state := testutil.GenerateState(storeInitialHeight, storeLastBlockHeight)
var managerStore store.Store
if mockStore == nil {
managerStore = store.New(store.NewDefaultInMemoryKVStore())
} else {
managerStore = mockStore
}
if _, err := managerStore.UpdateState(state, nil); err != nil {
return nil, err
}

logger := log.TestingLogger()
pubsubServer := pubsub.NewServer()
pubsubServer.Start()

// Init the settlement layer mock
if settlementlc == nil {
settlementlc = slregistry.GetClient(slregistry.Mock)
}
//TODO(omritoptix): Change the initialization. a bit dirty.
proposerKey, proposerPubKey, err := crypto.GenerateEd25519Key(rand.Reader)
if err != nil {
return nil, err
}
pubKeybytes, err := proposerPubKey.Raw()
if err != nil {
return nil, err
}

err = initSettlementLayerMock(settlementlc, hex.EncodeToString(pubKeybytes), pubsubServer, logger)
if err != nil {
return nil, err
}

if dalc == nil {
dalc = &mockda.DataAvailabilityLayerClient{}
}
initDALCMock(dalc, pubsubServer, logger)

var proxyApp proxy.AppConns
if proxyAppConns == nil {
proxyApp = testutil.GetABCIProxyAppMock(logger.With("module", "proxy"))
if err := proxyApp.Start(); err != nil {
return nil, err
}
} else {
proxyApp = proxyAppConns
}

mp := mempoolv1.NewTxMempool(logger, tmcfg.DefaultMempoolConfig(), proxyApp.Mempool(), 0)
mpIDs := nodemempool.NewMempoolIDs()

// Init p2p client and validator
p2pKey, _, _ := crypto.GenerateEd25519Key(rand.Reader)
p2pClient, err := p2p.NewClient(config.P2PConfig{}, p2pKey, "TestChain", logger)
if err != nil {
return nil, err
}
p2pValidator := p2p.NewValidator(logger, pubsubServer)
p2pClient.SetTxValidator(p2pValidator.TxValidator(mp, mpIDs))
p2pClient.SetBlockValidator(p2pValidator.BlockValidator())

if err = p2pClient.Start(context.Background()); err != nil {
return nil, err
}

manager, err := NewManager(proposerKey, conf, genesis, managerStore, mp, proxyApp, dalc, settlementlc, nil,
pubsubServer, p2pClient, logger)
if err != nil {
return nil, err
}
return manager, nil
}

// TODO(omritoptix): Possible move out to a generic testutil
func getMockDALC(logger log.Logger) da.DataAvailabilityLayerClient {
dalc := &mockda.DataAvailabilityLayerClient{}
initDALCMock(dalc, pubsub.NewServer(), logger)
return dalc
}

// TODO(omritoptix): Possible move out to a generic testutil
func initDALCMock(dalc da.DataAvailabilityLayerClient, pubsubServer *pubsub.Server, logger log.Logger) {
_ = dalc.Init(nil, pubsubServer, store.NewDefaultInMemoryKVStore(), logger)
_ = dalc.Start()
}

// TODO(omritoptix): Possible move out to a generic testutil
func initSettlementLayerMock(settlementlc settlement.LayerI, proposer string, pubsubServer *pubsub.Server, logger log.Logger) error {
err := settlementlc.Init(settlement.Config{ProposerPubKey: proposer}, pubsubServer, logger)
if err != nil {
return err
}
err = settlementlc.Start()
if err != nil {
return err
}
return nil
}

func getManagerConfig() config.BlockManagerConfig {
return config.BlockManagerConfig{
BlockTime: 100 * time.Millisecond,
BlockBatchSize: defaultBatchSize,
BlockBatchMaxSizeBytes: 1000,
BatchSubmitMaxTime: 30 * time.Minute,
NamespaceID: "0102030405060708",
}
}
183 changes: 183 additions & 0 deletions block/produce.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package block

import (
"context"
"fmt"
"sync/atomic"
"time"

"cosmossdk.io/errors"
abciconv "github.com/dymensionxyz/dymint/conv/abci"
"github.com/dymensionxyz/dymint/settlement"
"github.com/dymensionxyz/dymint/types"
)

// waitForSync enforces the aggregator to be synced before it can produce blocks.
// It requires the retriveBlockLoop to be running.
func (m *Manager) waitForSync(ctx context.Context) error {
resultRetrieveBatch, err := m.getLatestBatchFromSL(ctx)
// Set the syncTarget according to the result
if err == settlement.ErrBatchNotFound {
// Since we requested the latest batch and got batch not found it means
// the SL still hasn't got any batches for this chain.
m.logger.Info("No batches for chain found in SL. Start writing first batch")
atomic.StoreUint64(&m.syncTarget, uint64(m.genesis.InitialHeight-1))
return nil
} else if err != nil {
m.logger.Error("failed to retrieve batch from SL", "err", err)
return err
} else {
m.updateSyncParams(ctx, resultRetrieveBatch.EndHeight)
}
// Wait until isSynced is true and then call the PublishBlockLoop
m.isSyncedCond.L.Lock()
// Wait until we're synced and that we have got the latest batch (if we didn't, m.syncTarget == 0)
// before we start publishing blocks
for m.store.Height() < atomic.LoadUint64(&m.syncTarget) {
m.logger.Info("Waiting for sync", "current height", m.store.Height(), "syncTarget", atomic.LoadUint64(&m.syncTarget))
m.isSyncedCond.Wait()
}
m.isSyncedCond.L.Unlock()
m.logger.Info("Synced, Starting to produce", "current height", m.store.Height(), "syncTarget", atomic.LoadUint64(&m.syncTarget))
return nil
}

// ProduceBlockLoop is calling publishBlock in a loop as long as wer'e synced.
func (m *Manager) ProduceBlockLoop(ctx context.Context) {
atomic.StoreInt64(&m.lastSubmissionTime, time.Now().Unix())

// We want to wait until we are synced. After that, since there is no leader
// election yet, and leader are elected manually, we will not be out of sync until
// we are manually being replaced.
err := m.waitForSync(ctx)
if err != nil {
panic(errors.Wrap(err, "failed to wait for sync"))
}

ticker := time.NewTicker(m.conf.BlockTime)
defer ticker.Stop()

var tickerEmptyBlocksMaxTime *time.Ticker
var tickerEmptyBlocksMaxTimeCh <-chan time.Time
// Setup ticker for empty blocks if enabled
if m.conf.EmptyBlocksMaxTime > 0 {
tickerEmptyBlocksMaxTime = time.NewTicker(m.conf.EmptyBlocksMaxTime)
tickerEmptyBlocksMaxTimeCh = tickerEmptyBlocksMaxTime.C
defer tickerEmptyBlocksMaxTime.Stop()
}

//Allow the initial block to be empty
produceEmptyBlock := true
for {
select {
//Context canceled
case <-ctx.Done():
return
// If we got a request for an empty block produce it and don't wait for the ticker
case <-m.produceEmptyBlockCh:
produceEmptyBlock = true
//Empty blocks timeout
case <-tickerEmptyBlocksMaxTimeCh:
m.logger.Debug(fmt.Sprintf("No transactions for %.2f seconds, producing empty block", m.conf.EmptyBlocksMaxTime.Seconds()))
produceEmptyBlock = true
//Produce block
case <-ticker.C:
err := m.produceBlock(ctx, produceEmptyBlock)
if err == types.ErrSkippedEmptyBlock {
// m.logger.Debug("Skipped empty block")
continue
}
if err != nil {
m.logger.Error("error while producing block", "error", err)
continue
}
//If empty blocks enabled, after block produced, reset the timeout timer
if tickerEmptyBlocksMaxTime != nil {
produceEmptyBlock = false
tickerEmptyBlocksMaxTime.Reset(m.conf.EmptyBlocksMaxTime)
}

//Node's health check channel
case shouldProduceBlocks := <-m.shouldProduceBlocksCh:
for !shouldProduceBlocks {
m.logger.Info("Stopped block production")
shouldProduceBlocks = <-m.shouldProduceBlocksCh
}
m.logger.Info("Resumed Block production")
}
}
}

func (m *Manager) produceBlock(ctx context.Context, allowEmpty bool) error {
m.produceBlockMutex.Lock()
defer m.produceBlockMutex.Unlock()
var lastCommit *types.Commit
var lastHeaderHash [32]byte
var err error
height := m.store.Height()
newHeight := height + 1

// this is a special case, when first block is produced - there is no previous commit
if newHeight == uint64(m.genesis.InitialHeight) {
lastCommit = &types.Commit{Height: height, HeaderHash: [32]byte{}}
} else {
lastCommit, err = m.store.LoadCommit(height)
if err != nil {
return fmt.Errorf("error while loading last commit: %w", err)
}
lastBlock, err := m.store.LoadBlock(height)
if err != nil {
return fmt.Errorf("error while loading last block: %w", err)
}
lastHeaderHash = lastBlock.Header.Hash()
}

var block *types.Block
// Check if there's an already stored block and commit at a newer height
// If there is use that instead of creating a new block
var commit *types.Commit
pendingBlock, err := m.store.LoadBlock(newHeight)
if err == nil {
m.logger.Info("Using pending block", "height", newHeight)
block = pendingBlock
commit, err = m.store.LoadCommit(newHeight)
if err != nil {
m.logger.Error("Loaded block but failed to load commit", "height", newHeight, "error", err)
return err
}
} else {
block = m.executor.CreateBlock(newHeight, lastCommit, lastHeaderHash, m.lastState)
if !allowEmpty && len(block.Data.Txs) == 0 {
return types.ErrSkippedEmptyBlock
}

abciHeaderPb := abciconv.ToABCIHeaderPB(&block.Header)
abciHeaderBytes, err := abciHeaderPb.Marshal()
if err != nil {
return err
}
sign, err := m.proposerKey.Sign(abciHeaderBytes)
if err != nil {
return err
}
commit = &types.Commit{
Height: block.Header.Height,
HeaderHash: block.Header.Hash(),
Signatures: []types.Signature{sign},
}

}

// Gossip the block as soon as it is produced
if err := m.gossipBlock(ctx, *block, *commit); err != nil {
return err
}

if err := m.applyBlock(ctx, block, commit, blockMetaData{source: producedBlock}); err != nil {
return err
}

m.logger.Info("block created", "height", newHeight, "num_tx", len(block.Data.Txs))
rollappHeightGauge.Set(float64(newHeight))
return nil
}
101 changes: 101 additions & 0 deletions block/retriever.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package block

import (
"context"
"fmt"
"sync/atomic"

"code.cloudfoundry.org/go-diodes"
"github.com/dymensionxyz/dymint/da"
)

// RetriveLoop listens for new sync messages written to a ring buffer and in turn
// runs syncUntilTarget on the latest message in the ring buffer.
func (m *Manager) RetriveLoop(ctx context.Context) {
m.logger.Info("Started retrieve loop")
syncTargetpoller := diodes.NewPoller(m.syncTargetDiode)
for {
select {
case <-ctx.Done():
return
default:
// Get only the latest sync target
syncTarget := syncTargetpoller.Next()
m.syncUntilTarget(ctx, *(*uint64)(syncTarget))
// Check if after we sync we are synced or a new syncTarget was already set.
// If we are synced then signal all goroutines waiting on isSyncedCond.
if m.store.Height() >= atomic.LoadUint64(&m.syncTarget) {
m.logger.Info("Synced at height", "height", m.store.Height())
m.isSyncedCond.L.Lock()
m.isSyncedCond.Signal()
m.isSyncedCond.L.Unlock()
}
}
}
}

// syncUntilTarget syncs the block until the syncTarget is reached.
// It fetches the batches from the settlement, gets the DA height and gets
// the actual blocks from the DA.
func (m *Manager) syncUntilTarget(ctx context.Context, syncTarget uint64) {
currentHeight := m.store.Height()
for currentHeight < syncTarget {
m.logger.Info("Syncing until target", "current height", currentHeight, "syncTarget", syncTarget)
resultRetrieveBatch, err := m.settlementClient.RetrieveBatch(atomic.LoadUint64(&m.lastState.SLStateIndex) + 1)
if err != nil {
m.logger.Error("Failed to sync until target. error while retrieving batch", "error", err)
continue
}
err = m.processNextDABatch(ctx, resultRetrieveBatch.MetaData.DA.Height)
if err != nil {
m.logger.Error("Failed to sync until target. error while processing next DA batch", "error", err)
break
}
err = m.updateStateIndex(resultRetrieveBatch.StateIndex)
if err != nil {
return
}
currentHeight = m.store.Height()
}
}

func (m *Manager) updateStateIndex(stateIndex uint64) error {
atomic.StoreUint64(&m.lastState.SLStateIndex, stateIndex)
_, err := m.store.UpdateState(m.lastState, nil)
if err != nil {
m.logger.Error("Failed to update state", "error", err)
return err
}
return nil
}

func (m *Manager) processNextDABatch(ctx context.Context, daHeight uint64) error {
m.logger.Debug("trying to retrieve batch from DA", "daHeight", daHeight)
batchResp, err := m.fetchBatch(daHeight)
if err != nil {
m.logger.Error("failed to retrieve batch from DA", "daHeight", daHeight, "error", err)
return err
}
m.logger.Debug("retrieved batches", "n", len(batchResp.Batches), "daHeight", daHeight)
for _, batch := range batchResp.Batches {
for i, block := range batch.Blocks {
err := m.applyBlock(ctx, block, batch.Commits[i], blockMetaData{source: daBlock, daHeight: daHeight})
if err != nil {
return err
}
}
}
return nil
}

func (m *Manager) fetchBatch(daHeight uint64) (da.ResultRetrieveBatch, error) {
var err error
batchRes := m.retriever.RetrieveBatches(daHeight)
switch batchRes.Code {
case da.StatusError:
err = fmt.Errorf("failed to retrieve batch: %s", batchRes.Message)
case da.StatusTimeout:
err = fmt.Errorf("timeout during retrieve batch: %s", batchRes.Message)
}
return batchRes, err
}
146 changes: 146 additions & 0 deletions block/submit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package block

import (
"context"
"sync/atomic"
"time"

"github.com/dymensionxyz/dymint/da"
"github.com/dymensionxyz/dymint/types"
)

func (m *Manager) SubmitLoop(ctx context.Context) {
ticker := time.NewTicker(m.conf.BatchSubmitMaxTime)
defer ticker.Stop()

for {
select {
//Context canceled
case <-ctx.Done():
return
//TODO: add the case of batch size (should be signaled from the the block production)
// case <- requiredByNumOfBlocks
case <-ticker.C:
// SyncTarget is the height of the last block in the last batch as seen by this node.
syncTarget := atomic.LoadUint64(&m.syncTarget)
height := m.store.Height()
//no new blocks produced yet
if (height - syncTarget) == 0 {
continue
}

// Submit batch if we've reached the batch size and there isn't another batch currently in submission process.
if m.batchInProcess.Load() == true {
m.logger.Debug("Batch submission already in process, skipping submission")
continue
}

m.batchInProcess.Store(true)
// We try and produce an empty block to make sure releavnt ibc messages will pass through during the batch submission: https://github.com/dymensionxyz/research/issues/173.
err := m.produceBlock(ctx, true)
if err != nil {
m.logger.Error("error while producing empty block", "error", err)
}
m.submitNextBatch(ctx)
}
}
}

func (m *Manager) submitNextBatch(ctx context.Context) {
// Get the batch start and end height
startHeight := atomic.LoadUint64(&m.syncTarget) + 1
endHeight := uint64(m.lastState.LastBlockHeight)

isLastBlockEmpty, err := m.validateLastBlockInBatchIsEmpty(startHeight, endHeight)
if err != nil {
m.logger.Error("Failed to validate last block in batch is empty", "startHeight", startHeight, "endHeight", endHeight, "error", err)
return
}
if !isLastBlockEmpty {
m.logger.Info("Requesting for an empty block creation")
m.produceEmptyBlockCh <- true
}

// Create the batch
nextBatch, err := m.createNextDABatch(startHeight, endHeight)
if err != nil {
m.logger.Error("Failed to create next batch", "startHeight", startHeight, "endHeight", endHeight, "error", err)
return
}

actualEndHeight := nextBatch.EndHeight

// Submit batch to the DA
m.logger.Info("Submitting next batch", "startHeight", startHeight, "endHeight", actualEndHeight, "size", nextBatch.ToProto().Size())
resultSubmitToDA := m.dalc.SubmitBatch(nextBatch)
if resultSubmitToDA.Code != da.StatusSuccess {
panic("Failed to submit next batch to DA Layer")
}

// Submit batch to SL
// TODO(omritoptix): Handle a case where the SL submission fails due to syncTarget out of sync with the latestHeight in the SL.
// In that case we'll want to update the syncTarget before returning.
m.settlementClient.SubmitBatch(nextBatch, m.dalc.GetClientType(), &resultSubmitToDA)
}

func (m *Manager) createNextDABatch(startHeight uint64, endHeight uint64) (*types.Batch, error) {
var height uint64
// Create the batch
batchSize := endHeight - startHeight + 1
batch := &types.Batch{
StartHeight: startHeight,
EndHeight: endHeight,
Blocks: make([]*types.Block, 0, batchSize),
Commits: make([]*types.Commit, 0, batchSize),
}

// Populate the batch
for height = startHeight; height <= endHeight; height++ {
block, err := m.store.LoadBlock(height)
if err != nil {
m.logger.Error("Failed to load block", "height", height)
return nil, err
}
commit, err := m.store.LoadCommit(height)
if err != nil {
m.logger.Error("Failed to load commit", "height", height)
return nil, err
}

batch.Blocks = append(batch.Blocks, block)
batch.Commits = append(batch.Commits, commit)

//Check if the batch size is too big
totalSize := batch.ToProto().Size()
if totalSize > int(m.conf.BlockBatchMaxSizeBytes) {
// Nil out the last block and commit
batch.Blocks[len(batch.Blocks)-1] = nil
batch.Commits[len(batch.Commits)-1] = nil

// Remove the last block and commit from the batch
batch.Blocks = batch.Blocks[:len(batch.Blocks)-1]
batch.Commits = batch.Commits[:len(batch.Commits)-1]
break
}
}

batch.EndHeight = height - 1
return batch, nil
}

// Verify the last block in the batch is an empty block and that no ibc messages has accidentially passed through.
// This block may not be empty if another block has passed it in line. If that's the case our empty block request will
// be sent to the next batch.
func (m *Manager) validateLastBlockInBatchIsEmpty(startHeight uint64, endHeight uint64) (bool, error) {
m.logger.Debug("Verifying last block in batch is an empty block", "startHeight", startHeight, "endHeight", endHeight, "height")
lastBlock, err := m.store.LoadBlock(endHeight)
if err != nil {
m.logger.Error("Failed to load block", "height", endHeight, "error", err)
return false, err
}
if len(lastBlock.Data.Txs) != 0 {
m.logger.Info("Last block in batch is not an empty block", "startHeight", startHeight, "endHeight", endHeight, "height")
return false, nil
}
return true, nil
}
57 changes: 57 additions & 0 deletions block/synctarget.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package block

import (
"context"
"sync/atomic"
"time"

"code.cloudfoundry.org/go-diodes"
"github.com/dymensionxyz/dymint/settlement"
)

// SyncTargetLoop is responsible for getting real time updates about batches submission.
// for non aggregator: updating the sync target which will be used by retrieveLoop to sync until this target.
// for aggregator: get notification that batch has been accepted so can send next batch.
func (m *Manager) SyncTargetLoop(ctx context.Context) {
m.logger.Info("Started sync target loop")
subscription, err := m.pubsub.Subscribe(ctx, "syncTargetLoop", settlement.EventQueryNewBatchAccepted)
if err != nil {
m.logger.Error("failed to subscribe to state update events")
panic(err)
}
// First time we start we want to get the latest batch from the SL
resultRetrieveBatch, err := m.getLatestBatchFromSL(ctx)
if err != nil {
m.logger.Error("failed to retrieve batch from SL", "err", err)
} else {
m.updateSyncParams(ctx, resultRetrieveBatch.EndHeight)
}
for {
select {
case <-ctx.Done():
return
case event := <-subscription.Out():
eventData := event.Data().(*settlement.EventDataNewBatchAccepted)
m.updateSyncParams(ctx, eventData.EndHeight)
// In case we are the aggregator and we've got an update, then we can stop blocking from
// the next batches to be published. For non-aggregators this is not needed.
// We only want to send the next once the previous has been published successfully.
// TODO(omritoptix): Once we have leader election, we can add a condition.
// Update batch accepted is only relevant for the aggregator
// TODO(omritoptix): Check if we are the aggregator
m.batchInProcess.Store(false)
case <-subscription.Cancelled():
m.logger.Info("syncTargetLoop subscription canceled")
return
}
}
}

// updateSyncParams updates the sync target and state index if necessary
func (m *Manager) updateSyncParams(ctx context.Context, endHeight uint64) {
rollappHubHeightGauge.Set(float64(endHeight))
m.logger.Info("Received new syncTarget", "syncTarget", endHeight)
atomic.StoreUint64(&m.syncTarget, endHeight)
atomic.StoreInt64(&m.lastSubmissionTime, time.Now().UnixNano())
m.syncTargetDiode.Set(diodes.GenericDataType(&endHeight))
}
151 changes: 151 additions & 0 deletions block/testutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package block

import (
"context"
"crypto/rand"
"encoding/hex"
"time"

"github.com/dymensionxyz/dymint/p2p"
"github.com/dymensionxyz/dymint/settlement"
"github.com/dymensionxyz/dymint/testutil"
"github.com/libp2p/go-libp2p/core/crypto"

"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/pubsub"
"github.com/tendermint/tendermint/proxy"

"github.com/dymensionxyz/dymint/config"
"github.com/dymensionxyz/dymint/da"
mockda "github.com/dymensionxyz/dymint/da/mock"
mempoolv1 "github.com/dymensionxyz/dymint/mempool/v1"
nodemempool "github.com/dymensionxyz/dymint/node/mempool"
slregistry "github.com/dymensionxyz/dymint/settlement/registry"
"github.com/dymensionxyz/dymint/store"
tmcfg "github.com/tendermint/tendermint/config"
)

const (
defaultBatchSize = 5
batchLimitBytes = 2000
)

/* -------------------------------------------------------------------------- */
/* utils */
/* -------------------------------------------------------------------------- */

func getManager(conf config.BlockManagerConfig, settlementlc settlement.LayerI, dalc da.DataAvailabilityLayerClient, genesisHeight int64, storeInitialHeight int64, storeLastBlockHeight int64, proxyAppConns proxy.AppConns, mockStore store.Store) (*Manager, error) {
genesis := testutil.GenerateGenesis(genesisHeight)
// Change the LastBlockHeight to avoid calling InitChainSync within the manager
// And updating the state according to the genesis.
state := testutil.GenerateState(storeInitialHeight, storeLastBlockHeight)
var managerStore store.Store
if mockStore == nil {
managerStore = store.New(store.NewDefaultInMemoryKVStore())
} else {
managerStore = mockStore
}
if _, err := managerStore.UpdateState(state, nil); err != nil {
return nil, err
}

logger := log.TestingLogger()
pubsubServer := pubsub.NewServer()
err := pubsubServer.Start()
if err != nil {
return nil, err
}
// Init the settlement layer mock
if settlementlc == nil {
settlementlc = slregistry.GetClient(slregistry.Mock)
}
//TODO(omritoptix): Change the initialization. a bit dirty.
proposerKey, proposerPubKey, err := crypto.GenerateEd25519Key(rand.Reader)
if err != nil {
return nil, err
}
pubKeybytes, err := proposerPubKey.Raw()
if err != nil {
return nil, err
}

err = initSettlementLayerMock(settlementlc, hex.EncodeToString(pubKeybytes), pubsubServer, logger)
if err != nil {
return nil, err
}

if dalc == nil {
dalc = &mockda.DataAvailabilityLayerClient{}
}
initDALCMock(dalc, pubsubServer, logger)

var proxyApp proxy.AppConns
if proxyAppConns == nil {
proxyApp = testutil.GetABCIProxyAppMock(logger.With("module", "proxy"))
if err := proxyApp.Start(); err != nil {
return nil, err
}
} else {
proxyApp = proxyAppConns
}

mp := mempoolv1.NewTxMempool(logger, tmcfg.DefaultMempoolConfig(), proxyApp.Mempool(), 0)
mpIDs := nodemempool.NewMempoolIDs()

// Init p2p client and validator
p2pKey, _, _ := crypto.GenerateEd25519Key(rand.Reader)
p2pClient, err := p2p.NewClient(config.P2PConfig{}, p2pKey, "TestChain", logger)
if err != nil {
return nil, err
}
p2pValidator := p2p.NewValidator(logger, pubsubServer)
p2pClient.SetTxValidator(p2pValidator.TxValidator(mp, mpIDs))
p2pClient.SetBlockValidator(p2pValidator.BlockValidator())

if err = p2pClient.Start(context.Background()); err != nil {
return nil, err
}

manager, err := NewManager(proposerKey, conf, genesis, managerStore, mp, proxyApp, dalc, settlementlc, nil,
pubsubServer, p2pClient, logger)
if err != nil {
return nil, err
}
return manager, nil
}

// TODO(omritoptix): Possible move out to a generic testutil
func getMockDALC(logger log.Logger) da.DataAvailabilityLayerClient {
dalc := &mockda.DataAvailabilityLayerClient{}
initDALCMock(dalc, pubsub.NewServer(), logger)
return dalc
}

// TODO(omritoptix): Possible move out to a generic testutil
func initDALCMock(dalc da.DataAvailabilityLayerClient, pubsubServer *pubsub.Server, logger log.Logger) {
_ = dalc.Init(nil, pubsubServer, store.NewDefaultInMemoryKVStore(), logger)
_ = dalc.Start()
}

// TODO(omritoptix): Possible move out to a generic testutil
func initSettlementLayerMock(settlementlc settlement.LayerI, proposer string, pubsubServer *pubsub.Server, logger log.Logger) error {
err := settlementlc.Init(settlement.Config{ProposerPubKey: proposer}, pubsubServer, logger)
if err != nil {
return err
}
err = settlementlc.Start()
if err != nil {
return err
}
return nil
}

func getManagerConfig() config.BlockManagerConfig {
return config.BlockManagerConfig{
BlockTime: 100 * time.Millisecond,
BlockBatchSize: defaultBatchSize,
BlockBatchMaxSizeBytes: 1000,
BatchSubmitMaxTime: 30 * time.Minute,
NamespaceID: "0102030405060708",
}
}
2 changes: 0 additions & 2 deletions config/defaults.go
Original file line number Diff line number Diff line change
@@ -10,8 +10,6 @@ import (
const (
// DefaultListenAddress is a default listen address for P2P client.
DefaultListenAddress = "/ip4/0.0.0.0/tcp/7676"
// Version is a default dymint version for P2P client.
Version = "0.2.2"

DefaultHomeDir = "sequencer_keys"
DefaultChainID = "dymint-testnet"
29 changes: 18 additions & 11 deletions go.mod
Original file line number Diff line number Diff line change
@@ -9,7 +9,7 @@ require (
github.com/celestiaorg/go-cnc v0.4.2
github.com/centrifuge/go-substrate-rpc-client/v4 v4.0.12
github.com/dgraph-io/badger/v3 v3.2103.3
github.com/dymensionxyz/cosmosclient v0.4.0-beta
github.com/dymensionxyz/cosmosclient v0.4.1-beta
github.com/dymensionxyz/dymension v0.2.0-beta.0.20230607115558-745644a96ea6
github.com/go-kit/kit v0.12.0
github.com/gofrs/uuid v4.3.0+incompatible
@@ -27,7 +27,7 @@ require (
github.com/prometheus/client_golang v1.14.0
github.com/rs/cors v1.8.3
github.com/spf13/cobra v1.6.1
github.com/spf13/viper v1.14.0
github.com/spf13/viper v1.15.0
github.com/stretchr/testify v1.8.4
github.com/tendermint/tendermint v0.34.28
go.uber.org/multierr v1.8.0
@@ -41,24 +41,28 @@ require (
cosmossdk.io/math v1.0.0-rc.0 // indirect
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect
github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/StackExchange/wmi v1.2.1 // indirect
github.com/Workiva/go-datastructures v1.0.53 // indirect
github.com/blang/semver v3.5.1+incompatible // indirect
github.com/btcsuite/btcd v0.22.2 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.3.2 // indirect
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 // indirect
github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce // indirect
github.com/cometbft/cometbft-db v0.7.0 // indirect
github.com/cosmos/cosmos-proto v1.0.0-beta.2 // indirect
github.com/cosmos/cosmos-proto v1.0.0-beta.3 // indirect
github.com/cosmos/gogoproto v1.4.8 // indirect
github.com/creachadair/taskgroup v0.3.2 // indirect
github.com/deckarep/golang-set v1.8.0 // indirect
github.com/decred/base58 v1.0.4 // indirect
github.com/decred/dcrd/crypto/blake256 v1.0.0 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect
github.com/emirpasic/gods v1.18.1 // indirect
github.com/ethereum/go-ethereum v1.12.0 // indirect
github.com/evmos/evmos/v12 v12.1.6 // indirect
github.com/ghodss/yaml v1.0.0 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/go-playground/validator/v10 v10.4.1 // indirect
github.com/go-stack/stack v1.8.1 // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/pprof v0.0.0-20221203041831-ce31453925ec // indirect
github.com/hashicorp/go-uuid v1.0.1 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.1 // indirect
@@ -67,7 +71,7 @@ require (
github.com/lib/pq v1.10.7 // indirect
github.com/libp2p/go-yamux/v4 v4.0.0 // indirect
github.com/minio/highwayhash v1.0.2 // indirect
github.com/onsi/ginkgo/v2 v2.5.1 // indirect
github.com/onsi/ginkgo/v2 v2.9.0 // indirect
github.com/pierrec/xxHash v0.1.5 // indirect
github.com/quic-go/qpack v0.4.0 // indirect
github.com/quic-go/qtls-go1-19 v0.2.1 // indirect
@@ -79,7 +83,11 @@ require (
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/tecbot/gorocksdb v0.0.0-20191217155057-f0fad39f321c // indirect
github.com/tidwall/btree v1.5.0 // indirect
github.com/tklauser/go-sysconf v0.3.10 // indirect
github.com/tidwall/gjson v1.14.4 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
github.com/tidwall/sjson v1.2.5 // indirect
github.com/tyler-smith/go-bip39 v1.1.0 // indirect
github.com/vedhavyas/go-subkey v1.0.3 // indirect
github.com/zondax/ledger-go v0.14.1 // indirect
go.uber.org/dig v1.15.0 // indirect
@@ -172,7 +180,7 @@ require (
github.com/libp2p/go-nat v0.1.0 // indirect
github.com/libp2p/go-netroute v0.2.1 // indirect
github.com/libp2p/go-reuseport v0.2.0 // indirect
github.com/magiconair/properties v1.8.6 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd // indirect
github.com/mattn/go-isatty v0.0.18 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
@@ -196,7 +204,6 @@ require (
github.com/opencontainers/runtime-spec v1.0.3-0.20210326190908-1c3f411f0417 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/pelletier/go-toml/v2 v2.0.7 // indirect
github.com/petermattis/goid v0.0.0-20230317030725-371a4b8eda08 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
@@ -207,12 +214,12 @@ require (
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/sasha-s/go-deadlock v0.3.1 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/spf13/afero v1.9.2 // indirect
github.com/spf13/afero v1.9.3 // indirect
github.com/spf13/cast v1.5.0 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/subosito/gotenv v1.4.1 // indirect
github.com/subosito/gotenv v1.4.2 // indirect
github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d // indirect
github.com/tendermint/go-amino v0.16.0 // indirect
github.com/tendermint/tm-db v0.6.7 // indirect
105 changes: 76 additions & 29 deletions go.sum

Large diffs are not rendered by default.

11 changes: 6 additions & 5 deletions rpc/client/client.go
Original file line number Diff line number Diff line change
@@ -4,13 +4,14 @@ import (
"context"
"errors"

"github.com/dymensionxyz/dymint/version"

"fmt"
"sort"
"time"

sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"

rconfig "github.com/dymensionxyz/dymint/config"
abciconv "github.com/dymensionxyz/dymint/conv/abci"
"github.com/dymensionxyz/dymint/mempool"
"github.com/dymensionxyz/dymint/node"
@@ -26,7 +27,7 @@ import (
rpcclient "github.com/tendermint/tendermint/rpc/client"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
"github.com/tendermint/tendermint/types"
"github.com/tendermint/tendermint/version"
tm_version "github.com/tendermint/tendermint/version"
)

const (
@@ -88,7 +89,7 @@ func (c *Client) ABCIQueryWithOptions(ctx context.Context, path string, data tmb
if err != nil {
return nil, err
}
c.Logger.Debug("ABCIQuery", "path", path, "data", data, "result", resQuery)
c.Logger.Debug("ABCIQuery", "path", path, "height", resQuery.Height)
return &ctypes.ResultABCIQuery{Response: *resQuery}, nil
}

@@ -718,7 +719,7 @@ func (c *Client) Status(ctx context.Context) (*ctypes.ResultStatus, error) {
return nil, fmt.Errorf("failed to load the last saved state: %w", err)
}
defaultProtocolVersion := p2p.NewProtocolVersion(
version.P2PProtocol,
tm_version.P2PProtocol,
state.Version.Consensus.Block,
state.Version.Consensus.App,
)
@@ -732,7 +733,7 @@ func (c *Client) Status(ctx context.Context) (*ctypes.ResultStatus, error) {
DefaultNodeID: id,
ListenAddr: addr,
Network: network,
Version: rconfig.Version,
Version: version.BuildVersion,
Channels: []byte{0x1},
Moniker: config.DefaultBaseConfig().Moniker,
Other: p2p.DefaultNodeInfoOther{
2 changes: 1 addition & 1 deletion settlement/base.go
Original file line number Diff line number Diff line change
@@ -179,7 +179,7 @@ func (b *BaseLayerClient) stateUpdatesHandler(ready chan bool) {
utils.SubmitEventOrPanic(b.ctx, b.pubsub, newBatchEventData,
map[string][]string{EventTypeKey: {EventNewBatchAccepted}})
case <-subscription.Cancelled():
b.logger.Info("subscription canceled")
b.logger.Info("stateUpdatesHandler subscription canceled")
return
case <-b.ctx.Done():
b.logger.Info("Context done. Exiting state update handler")
3 changes: 2 additions & 1 deletion utils/events.go
Original file line number Diff line number Diff line change
@@ -22,7 +22,8 @@ func SubscribeAndHandleEvents(ctx context.Context, pubsubServer *pubsub.Server,
case event := <-subscription.Out():
callback(event)
case <-subscription.Cancelled():
logger.Info("Subscription canceled")
logger.Info(clientID + " subscription canceled")
return
}
}
}
3 changes: 3 additions & 0 deletions version/version.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package version

var BuildVersion = "<version>"

0 comments on commit 65bc962

Please sign in to comment.