Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Problem: block-stm is not integrated in sdk #1022

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -855,7 +855,7 @@ func (app *BaseApp) executeTxs(ctx context.Context, txs [][]byte) ([]*abci.ExecT
if app.txExecutor != nil {
return app.txExecutor(ctx, txs, app.finalizeBlockState.ms, func(i int, memTx sdk.Tx, ms storetypes.MultiStore, incarnationCache map[string]any) *abci.ExecTxResult {
return app.deliverTxWithMultiStore(txs[i], memTx, i, ms, incarnationCache)
})
}, app.txResponsePatcher)
}

txResults := make([]*abci.ExecTxResult, 0, len(txs))
Expand Down
3 changes: 3 additions & 0 deletions baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,9 @@ type BaseApp struct {

// Optional alternative tx executor, used for block-stm parallel transaction execution.
txExecutor TxExecutor

// Optional alternative tx response patcher, used for block-stm parallel transaction execution.
txResponsePatcher TxResponsePatcher
}

// NewBaseApp returns a reference to an initialized BaseApp. It accepts a
Expand Down
10 changes: 10 additions & 0 deletions baseapp/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ func SetTxExecutor(executor TxExecutor) func(*BaseApp) {
return func(app *BaseApp) { app.txExecutor = executor }
}

// SetTxResponsePatcher sets a custom tx response patcher for the BaseApp, usually for parallel execution.
func SetTxResponsePatcher(patcher TxResponsePatcher) func(*BaseApp) {
return func(app *BaseApp) { app.txResponsePatcher = patcher }
}

func (app *BaseApp) SetName(name string) {
if app.sealed {
panic("SetName() on sealed BaseApp")
Expand Down Expand Up @@ -393,6 +398,11 @@ func (app *BaseApp) SetTxExecutor(executor TxExecutor) {
app.txExecutor = executor
}

// SetTxResponsePatcher sets a custom tx response patcher for the BaseApp, usually for parallel execution.
func (app *BaseApp) SetTxResponsePatcher(patcher TxResponsePatcher) {
app.txResponsePatcher = patcher
}

// SetMsgServiceRouter sets the MsgServiceRouter of a BaseApp.
func (app *BaseApp) SetMsgServiceRouter(msgServiceRouter *MsgServiceRouter) {
app.msgServiceRouter = msgServiceRouter
Expand Down
145 changes: 145 additions & 0 deletions baseapp/txexecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,161 @@ package baseapp

import (
"context"
"io"
"sync/atomic"

abci "github.com/cometbft/cometbft/abci/types"
sdk "github.com/cosmos/cosmos-sdk/types"

"cosmossdk.io/store/cachemulti"
"cosmossdk.io/store/types"
blockstm "github.com/crypto-org-chain/go-block-stm"
)

type TxResponsePatcher interface {
Patch(input []*abci.ExecTxResult) []*abci.ExecTxResult
}

type stmMultiStoreWrapper struct {
types.MultiStore
}

var _ types.MultiStore = stmMultiStoreWrapper{}

type msWrapper struct {
blockstm.MultiStore
}

var _ types.MultiStore = msWrapper{}

func (ms msWrapper) getCacheWrapper(key types.StoreKey) types.CacheWrapper {
return ms.GetStore(key)
}

func (ms msWrapper) CacheMultiStore() types.CacheMultiStore {
return cachemulti.NewFromParent(ms.getCacheWrapper, nil, nil)
}

// Implements CacheWrapper.
func (ms msWrapper) CacheWrap() types.CacheWrap {
return ms.CacheMultiStore().(types.CacheWrap)
}

// GetStoreType returns the type of the store.
func (ms msWrapper) GetStoreType() types.StoreType {
return types.StoreTypeMulti
}

// Implements interface MultiStore
func (ms msWrapper) SetTracer(io.Writer) types.MultiStore {
return nil
}

// Implements interface MultiStore
func (ms msWrapper) SetTracingContext(types.TraceContext) types.MultiStore {
return nil
}

// Implements interface MultiStore
func (ms msWrapper) TracingEnabled() bool {
return false
}

type TxExecutor func(
ctx context.Context,
block [][]byte,
cms types.MultiStore,
deliverTxWithMultiStore func(int, sdk.Tx, types.MultiStore, map[string]any) *abci.ExecTxResult,
patcher TxResponsePatcher,
) ([]*abci.ExecTxResult, error)

func DefaultTxExecutor(_ context.Context,
txs [][]byte,
ms types.MultiStore,
deliverTxWithMultiStore func(int, sdk.Tx, types.MultiStore, map[string]any) *abci.ExecTxResult,
patcher TxResponsePatcher,
) ([]*abci.ExecTxResult, error) {
blockSize := len(txs)
results := make([]*abci.ExecTxResult, blockSize)
for i := 0; i < blockSize; i++ {
results[i] = deliverTxWithMultiStore(i, nil, ms, nil)
}
if patcher != nil {
return patcher.Patch(results), nil
}
return results, nil
}

func STMTxExecutor(
stores []types.StoreKey,
workers int,
txDecoder sdk.TxDecoder,
preEstimates func(txs [][]byte, workers int, txDecoder sdk.TxDecoder, ms types.MultiStore) ([]sdk.Tx, []blockstm.MultiLocations),
) TxExecutor {
index := make(map[types.StoreKey]int, len(stores))
for i, k := range stores {
index[k] = i
}
return func(
ctx context.Context,
txs [][]byte,
ms types.MultiStore,
deliverTxWithMultiStore func(int, sdk.Tx, types.MultiStore, map[string]any) *abci.ExecTxResult,
patcher TxResponsePatcher,
) ([]*abci.ExecTxResult, error) {
blockSize := len(txs)
if blockSize == 0 {
return nil, nil
}
results := make([]*abci.ExecTxResult, blockSize)
incarnationCache := make([]atomic.Pointer[map[string]any], blockSize)
for i := 0; i < blockSize; i++ {
m := make(map[string]any)
incarnationCache[i].Store(&m)
}

var (
estimates []blockstm.MultiLocations
memTxs []sdk.Tx
)
if preEstimates != nil {
// pre-estimation
memTxs, estimates = preEstimates(txs, workers, txDecoder, ms)
}

if err := blockstm.ExecuteBlockWithEstimates(
ctx,
blockSize,
index,
stmMultiStoreWrapper{ms},
workers,
estimates,
func(txn blockstm.TxnIndex, ms blockstm.MultiStore) {
var cache map[string]any

// only one of the concurrent incarnations gets the cache if there are any, otherwise execute without
// cache, concurrent incarnations should be rare.
v := incarnationCache[txn].Swap(nil)
if v != nil {
cache = *v
}

var memTx sdk.Tx
if memTxs != nil {
memTx = memTxs[txn]
}
results[txn] = deliverTxWithMultiStore(int(txn), memTx, msWrapper{ms}, cache)

if v != nil {
incarnationCache[txn].Store(v)
}
},
); err != nil {
return nil, err
}
if patcher != nil {
return patcher.Patch(results), nil
}
return results, nil
}
}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ require (
github.com/cosmos/gogogateway v1.2.0
github.com/cosmos/gogoproto v1.7.0
github.com/cosmos/ledger-cosmos-go v0.13.3
github.com/crypto-org-chain/go-block-stm v0.0.0-20240919080136-6c49aef68716
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0
github.com/golang/mock v1.6.0
github.com/golang/protobuf v1.5.4
Expand Down Expand Up @@ -184,6 +185,7 @@ replace (
github.com/gin-gonic/gin => github.com/gin-gonic/gin v1.9.1
// replace broken goleveldb
github.com/syndtr/goleveldb => github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7
github.com/tidwall/btree => github.com/crypto-org-chain/btree v0.0.0-20240406140148-2687063b042c
)

retract (
Expand Down
8 changes: 6 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:ma
github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/crypto-org-chain/btree v0.0.0-20240406140148-2687063b042c h1:MOgfS4+FBB8cMkDE2j2VBVsbY+HCkPIu0YsJ/9bbGeQ=
github.com/crypto-org-chain/btree v0.0.0-20240406140148-2687063b042c/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY=
github.com/crypto-org-chain/go-block-stm v0.0.0-20240919080136-6c49aef68716 h1:OvD5Rm0B6LHUJk6z858UgwdP72jU2DuUdXeclRyKpDI=
github.com/crypto-org-chain/go-block-stm v0.0.0-20240919080136-6c49aef68716/go.mod h1:iwQTX9xMX8NV9k3o2BiWXA0SswpsZrDk5q3gA7nWYiE=
github.com/danieljoos/wincred v1.1.2 h1:QLdCxFs1/Yl4zduvBdcHB8goaYk9RARS2SgLLRuAyr0=
github.com/danieljoos/wincred v1.1.2/go.mod h1:GijpziifJoIBfYh+S7BbkdUTU4LfM+QnGqR5Vl2tAx0=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down Expand Up @@ -709,8 +713,8 @@ github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 h1:epCh84lMvA70
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc=
github.com/tendermint/go-amino v0.16.0 h1:GyhmgQKvqF82e2oZeuMSp9JTN0N09emoSZlb2lyGa2E=
github.com/tendermint/go-amino v0.16.0/go.mod h1:TQU0M1i/ImAo+tYpZi73AU3V/dKeCoMC9Sphe2ZwGME=
github.com/tidwall/btree v1.7.0 h1:L1fkJH/AuEh5zBnnBbmTwQ5Lt+bRJ5A8EWecslvo9iI=
github.com/tidwall/btree v1.7.0/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY=
github.com/test-go/testify v1.1.4 h1:Tf9lntrKUMHiXQ07qBScBTSA0dhYQlu83hswqelv1iE=
github.com/test-go/testify v1.1.4/go.mod h1:rH7cfJo/47vWGdi4GPj16x3/t1xGOj2YxzmNQzk2ghU=
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
Expand Down
25 changes: 24 additions & 1 deletion server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"math"

"github.com/cometbft/cometbft/libs/strings"
"github.com/spf13/viper"

pruningtypes "cosmossdk.io/store/pruning/types"
Expand All @@ -29,6 +30,9 @@ const (
// DefaultGRPCMaxSendMsgSize defines the default gRPC max message size in
// bytes the server can send.
DefaultGRPCMaxSendMsgSize = math.MaxInt32

BlockExecutorSequential = "sequential"
BlockExecutorBlockSTM = "block-stm"
)

// BaseConfig defines the server's basic configuration
Expand Down Expand Up @@ -182,6 +186,16 @@ type (
}
)

// BlockStmConfig defines the block stm configuration.
type BlockSTMConfig struct {
// Executor sets the executor type, "block-stm" for parallel execution, "sequential" for sequential execution.
Executor string `mapstructure:"executor"`
// Workers is the number of workers for block-stm execution, 0 means using all available CPUs.
Workers uint64 `mapstructure:"workers"`
// PreEstimate is the flag to enable pre-estimation for block-stm execution.
PreEstimate bool `mapstructure:"pre-estimate"`
}

// Config defines the server's top level configuration
type Config struct {
BaseConfig `mapstructure:",squash"`
Expand All @@ -194,6 +208,7 @@ type Config struct {
StateSync StateSyncConfig `mapstructure:"state-sync"`
Streaming StreamingConfig `mapstructure:"streaming"`
Mempool MempoolConfig `mapstructure:"mempool"`
BlockSTM BlockSTMConfig `mapstructure:"block-stm"`
}

// SetMinGasPrices sets the validator's minimum gas prices.
Expand Down Expand Up @@ -265,6 +280,11 @@ func DefaultConfig() *Config {
Mempool: MempoolConfig{
MaxTxs: -1,
},
BlockSTM: BlockSTMConfig{
Executor: BlockExecutorSequential,
Workers: 0,
PreEstimate: false,
},
}
}

Expand All @@ -287,6 +307,9 @@ func (c Config) ValidateBasic() error {
"cannot enable state sync snapshots with '%s' pruning setting", pruningtypes.PruningOptionEverything,
)
}

blockExecutors := []string{BlockExecutorSequential, BlockExecutorBlockSTM}
if c.BlockSTM.Executor != "" && !strings.StringInSlice(c.BlockSTM.Executor, blockExecutors) {
return fmt.Errorf("invalid block executor type %s, available types: %v", c.BlockSTM.Executor, blockExecutors)
}
return nil
}
15 changes: 15 additions & 0 deletions server/config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,21 @@ stop-node-on-err = {{ .Streaming.ABCI.StopNodeOnErr }}
# Note, this configuration only applies to SDK built-in app-side mempool
# implementations.
max-txs = {{ .Mempool.MaxTxs }}

###############################################################################
### Block STM ###
###############################################################################

[block-stm]

# Executor sets the executor type, "block-stm" for parallel execution, "sequential" for sequential execution.
executor = "{{ .BlockSTM.Executor }}"

# STMWorkers is the number of workers for block-stm execution, 0 means using all available CPUs.
workers = {{ .BlockSTM.Workers }}

# PreEstimate is the flag to enable pre-estimation for block-stm execution.
pre-estimate = {{ .BlockSTM.PreEstimate }}
`

var configTemplate *template.Template
Expand Down
8 changes: 8 additions & 0 deletions server/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ const (
// mempool flags
FlagMempoolMaxTxs = "mempool.max-txs"

// block-stm flags
FlagBlockSTMExecutor = "block-stm.executor"
FlagBlockSTMWorkers = "block-stm.workers"
FlagBlockSTMPreEstimate = "block-stm.pre-estimate"

// testnet keys
KeyIsTestnet = "is-testnet"
KeyNewChainID = "new-chain-ID"
Expand Down Expand Up @@ -997,6 +1002,9 @@ func addStartNodeFlags(cmd *cobra.Command, opts StartCmdOptions) {
cmd.Flags().Uint32(FlagStateSyncSnapshotKeepRecent, 2, "State sync snapshot to keep")
cmd.Flags().Bool(FlagDisableIAVLFastNode, false, "Disable fast node for IAVL tree")
cmd.Flags().Int(FlagMempoolMaxTxs, mempool.DefaultMaxTx, "Sets MaxTx value for the app-side mempool")
cmd.Flags().String(FlagBlockSTMExecutor, serverconfig.BlockExecutorSequential, "Sets the executor type (block-stm|sequential)")
cmd.Flags().Int(FlagBlockSTMWorkers, 0, "Sets the number of workers for block-stm execution, 0 means using all available CPUs")
cmd.Flags().Bool(FlagBlockSTMPreEstimate, false, "Sets the flag to enable pre-estimation for block-stm execution")
cmd.Flags().Duration(FlagShutdownGrace, 0*time.Second, "On Shutdown, duration to wait for resource clean up")

// support old flags name for backwards compatibility
Expand Down
2 changes: 2 additions & 0 deletions simapp/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ require (
github.com/cosmos/ledger-cosmos-go v0.13.3 // indirect
github.com/creachadair/atomicfile v0.3.1 // indirect
github.com/creachadair/tomledit v0.0.24 // indirect
github.com/crypto-org-chain/go-block-stm v0.0.0-20240919080136-6c49aef68716 // indirect
github.com/danieljoos/wincred v1.1.2 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
Expand Down Expand Up @@ -219,4 +220,5 @@ replace (
github.com/gin-gonic/gin => github.com/gin-gonic/gin v1.9.1
// replace broken goleveldb
github.com/syndtr/goleveldb => github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7
github.com/tidwall/btree => github.com/crypto-org-chain/btree v0.0.0-20240406140148-2687063b042c
)
8 changes: 6 additions & 2 deletions simapp/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,10 @@ github.com/creachadair/tomledit v0.0.24 h1:5Xjr25R2esu1rKCbQEmjZYlrhFkDspoAbAKb6
github.com/creachadair/tomledit v0.0.24/go.mod h1:9qHbShRWQzSCcn617cMzg4eab1vbLCOjOshAWSzWr8U=
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/crypto-org-chain/btree v0.0.0-20240406140148-2687063b042c h1:MOgfS4+FBB8cMkDE2j2VBVsbY+HCkPIu0YsJ/9bbGeQ=
github.com/crypto-org-chain/btree v0.0.0-20240406140148-2687063b042c/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY=
github.com/crypto-org-chain/go-block-stm v0.0.0-20240919080136-6c49aef68716 h1:OvD5Rm0B6LHUJk6z858UgwdP72jU2DuUdXeclRyKpDI=
github.com/crypto-org-chain/go-block-stm v0.0.0-20240919080136-6c49aef68716/go.mod h1:iwQTX9xMX8NV9k3o2BiWXA0SswpsZrDk5q3gA7nWYiE=
github.com/danieljoos/wincred v1.1.2 h1:QLdCxFs1/Yl4zduvBdcHB8goaYk9RARS2SgLLRuAyr0=
github.com/danieljoos/wincred v1.1.2/go.mod h1:GijpziifJoIBfYh+S7BbkdUTU4LfM+QnGqR5Vl2tAx0=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down Expand Up @@ -1009,8 +1013,8 @@ github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 h1:epCh84lMvA70
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc=
github.com/tendermint/go-amino v0.16.0 h1:GyhmgQKvqF82e2oZeuMSp9JTN0N09emoSZlb2lyGa2E=
github.com/tendermint/go-amino v0.16.0/go.mod h1:TQU0M1i/ImAo+tYpZi73AU3V/dKeCoMC9Sphe2ZwGME=
github.com/tidwall/btree v1.7.0 h1:L1fkJH/AuEh5zBnnBbmTwQ5Lt+bRJ5A8EWecslvo9iI=
github.com/tidwall/btree v1.7.0/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY=
github.com/test-go/testify v1.1.4 h1:Tf9lntrKUMHiXQ07qBScBTSA0dhYQlu83hswqelv1iE=
github.com/test-go/testify v1.1.4/go.mod h1:rH7cfJo/47vWGdi4GPj16x3/t1xGOj2YxzmNQzk2ghU=
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
Expand Down
Loading
Loading