From d78c893dcfcf0adcf58dc2473921330d386f7dfe Mon Sep 17 00:00:00 2001 From: yihuang Date: Fri, 22 Mar 2024 09:14:32 +0800 Subject: [PATCH] Problem: parallel tx execution is not supported (#205) add basic support in sdk: - add a TxExecutor baseapp option - add TxIndex/TxCount/MsgIndex in context Update CHANGELOG.md Signed-off-by: yihuang fix misspell fix lint run gci fix lint gci seems not compatible with gofumpt --- .golangci.yml | 9 ---- CHANGELOG.md | 8 +++- baseapp/abci.go | 91 ++++++++++++++++++++++++++--------------- baseapp/baseapp.go | 27 +++++++++--- baseapp/genesis.go | 2 +- baseapp/options.go | 10 +++++ baseapp/test_helpers.go | 4 +- baseapp/txexecutor.go | 16 ++++++++ types/context.go | 35 ++++++++++++++++ 9 files changed, 151 insertions(+), 51 deletions(-) create mode 100644 baseapp/txexecutor.go diff --git a/.golangci.yml b/.golangci.yml index 3f61e47a961e..7f51609abf1d 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -20,7 +20,6 @@ linters: - exportloopref - goconst - gocritic - - gci - gofumpt - gosec - gosimple @@ -63,14 +62,6 @@ issues: max-same-issues: 10000 linters-settings: - gci: - custom-order: true - sections: - - standard # Standard section: captures all standard packages. - - default # Default section: contains all imports that could not be matched to another section type. - - prefix(cosmossdk.io) - - prefix(github.com/cosmos/cosmos-sdk) - gosec: # To select a subset of rules to run. # Available rules: https://github.com/securego/gosec#available-rules diff --git a/CHANGELOG.md b/CHANGELOG.md index ec5b80873dff..cb45d1180abe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,7 +38,13 @@ Ref: https://keepachangelog.com/en/1.0.0/ ## [Unreleased] -## Bug Fixes +### Features + +* (baseapp) [#205](https://github.com/crypto-org-chain/cosmos-sdk/pull/205) Add `TxExecutor` baseapp option, add `TxIndex`/`TxCount`/`MsgIndex`/`BlockGasUsed` fields to `Context, to support tx parallel execution. + +## [Unreleased-Upstream] + +### Bug Fixes * (crypto) [#19691](https://github.com/cosmos/cosmos-sdk/pull/19745) Fix tx sign doesn't throw an error when incorrect Ledger is used. diff --git a/baseapp/abci.go b/baseapp/abci.go index f80d4e2c3ab7..04ef73f002f4 100644 --- a/baseapp/abci.go +++ b/baseapp/abci.go @@ -336,11 +336,11 @@ func (app *BaseApp) ApplySnapshotChunk(req *abci.RequestApplySnapshotChunk) (*ab func (app *BaseApp) CheckTx(req *abci.RequestCheckTx) (*abci.ResponseCheckTx, error) { var mode execMode - switch { - case req.Type == abci.CheckTxType_New: + switch req.Type { + case abci.CheckTxType_New: mode = execModeCheck - case req.Type == abci.CheckTxType_Recheck: + case abci.CheckTxType_Recheck: mode = execModeReCheck default: @@ -775,48 +775,34 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request // Reset the gas meter so that the AnteHandlers aren't required to gasMeter = app.getBlockGasMeter(app.finalizeBlockState.Context()) - app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithBlockGasMeter(gasMeter)) + app.finalizeBlockState.SetContext( + app.finalizeBlockState.Context(). + WithBlockGasMeter(gasMeter). + WithTxCount(len(req.Txs)), + ) // Iterate over all raw transactions in the proposal and attempt to execute // them, gathering the execution results. // // NOTE: Not all raw transactions may adhere to the sdk.Tx interface, e.g. // vote extensions, so skip those. - txResults := make([]*abci.ExecTxResult, 0, len(req.Txs)) - for _, rawTx := range req.Txs { - var response *abci.ExecTxResult - - if _, err := app.txDecoder(rawTx); err == nil { - response = app.deliverTx(rawTx) - } else { - // In the case where a transaction included in a block proposal is malformed, - // we still want to return a default response to comet. This is because comet - // expects a response for each transaction included in a block proposal. - response = sdkerrors.ResponseExecTxResultWithEvents( - sdkerrors.ErrTxDecode, - 0, - 0, - nil, - false, - ) - } - - // check after every tx if we should abort - select { - case <-ctx.Done(): - return nil, ctx.Err() - default: - // continue - } - - txResults = append(txResults, response) + txResults, err := app.executeTxs(ctx, req.Txs) + if err != nil { + // usually due to canceled + return nil, err } if app.finalizeBlockState.ms.TracingEnabled() { app.finalizeBlockState.ms = app.finalizeBlockState.ms.SetTracingContext(nil).(storetypes.CacheMultiStore) } - endBlock, err := app.endBlock(app.finalizeBlockState.Context()) + var blockGasUsed uint64 + for _, res := range txResults { + blockGasUsed += uint64(res.GasUsed) + } + sdkCtx := app.finalizeBlockState.Context().WithBlockGasUsed(blockGasUsed) + + endBlock, err := app.endBlock(sdkCtx) if err != nil { return nil, err } @@ -840,6 +826,45 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request }, nil } +func (app *BaseApp) executeTxs(ctx context.Context, txs [][]byte) ([]*abci.ExecTxResult, error) { + if app.txExecutor != nil { + return app.txExecutor(ctx, len(txs), app.finalizeBlockState.ms, func(i int, ms storetypes.MultiStore) *abci.ExecTxResult { + return app.deliverTxWithMultiStore(txs[i], i, ms) + }) + } + + txResults := make([]*abci.ExecTxResult, 0, len(txs)) + for i, rawTx := range txs { + var response *abci.ExecTxResult + + if _, err := app.txDecoder(rawTx); err == nil { + response = app.deliverTx(rawTx, i) + } else { + // In the case where a transaction included in a block proposal is malformed, + // we still want to return a default response to comet. This is because comet + // expects a response for each transaction included in a block proposal. + response = sdkerrors.ResponseExecTxResultWithEvents( + sdkerrors.ErrTxDecode, + 0, + 0, + nil, + false, + ) + } + + // check after every tx if we should abort + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + // continue + } + + txResults = append(txResults, response) + } + return txResults, nil +} + // FinalizeBlock will execute the block proposal provided by RequestFinalizeBlock. // Specifically, it will execute an application's BeginBlock (if defined), followed // by the transactions in the proposal, finally followed by the application's diff --git a/baseapp/baseapp.go b/baseapp/baseapp.go index c7e6e616f497..998cd19fa25b 100644 --- a/baseapp/baseapp.go +++ b/baseapp/baseapp.go @@ -193,6 +193,9 @@ type BaseApp struct { // // SAFETY: it's safe to do if validators validate the total gas wanted in the `ProcessProposal`, which is the case in the default handler. disableBlockGasMeter bool + + // Optional alternative tx executor, used for block-stm parallel transaction execution. + txExecutor TxExecutor } // NewBaseApp returns a reference to an initialized BaseApp. It accepts a @@ -659,13 +662,14 @@ func (app *BaseApp) getBlockGasMeter(ctx sdk.Context) storetypes.GasMeter { } // retrieve the context for the tx w/ txBytes and other memoized values. -func (app *BaseApp) getContextForTx(mode execMode, txBytes []byte) sdk.Context { +func (app *BaseApp) getContextForTx(mode execMode, txBytes []byte, txIndex int) sdk.Context { modeState := app.getState(mode) if modeState == nil { panic(fmt.Sprintf("state is nil for mode %v", mode)) } ctx := modeState.Context(). - WithTxBytes(txBytes) + WithTxBytes(txBytes). + WithTxIndex(txIndex) // WithVoteInfos(app.voteInfos) // TODO: identify if this is needed ctx = ctx.WithConsensusParams(app.GetConsensusParams(ctx)) @@ -746,7 +750,11 @@ func (app *BaseApp) beginBlock(req *abci.RequestFinalizeBlock) (sdk.BeginBlock, return resp, nil } -func (app *BaseApp) deliverTx(tx []byte) *abci.ExecTxResult { +func (app *BaseApp) deliverTx(tx []byte, txIndex int) *abci.ExecTxResult { + return app.deliverTxWithMultiStore(tx, txIndex, nil) +} + +func (app *BaseApp) deliverTxWithMultiStore(tx []byte, txIndex int, txMultiStore storetypes.MultiStore) *abci.ExecTxResult { gInfo := sdk.GasInfo{} resultStr := "successful" @@ -759,7 +767,7 @@ func (app *BaseApp) deliverTx(tx []byte) *abci.ExecTxResult { telemetry.SetGauge(float32(gInfo.GasWanted), "tx", "gas", "wanted") }() - gInfo, result, anteEvents, err := app.runTx(execModeFinalize, tx) + gInfo, result, anteEvents, err := app.runTxWithMultiStore(execModeFinalize, tx, txIndex, txMultiStore) if err != nil { resultStr = "failed" resp = sdkerrors.ResponseExecTxResultWithEvents( @@ -817,12 +825,19 @@ func (app *BaseApp) endBlock(ctx context.Context) (sdk.EndBlock, error) { // returned if the tx does not run out of gas and if all the messages are valid // and execute successfully. An error is returned otherwise. func (app *BaseApp) runTx(mode execMode, txBytes []byte) (gInfo sdk.GasInfo, result *sdk.Result, anteEvents []abci.Event, err error) { + return app.runTxWithMultiStore(mode, txBytes, -1, nil) +} + +func (app *BaseApp) runTxWithMultiStore(mode execMode, txBytes []byte, txIndex int, txMultiStore storetypes.MultiStore) (gInfo sdk.GasInfo, result *sdk.Result, anteEvents []abci.Event, err error) { // NOTE: GasWanted should be returned by the AnteHandler. GasUsed is // determined by the GasMeter. We need access to the context to get the gas // meter, so we initialize upfront. var gasWanted uint64 - ctx := app.getContextForTx(mode, txBytes) + ctx := app.getContextForTx(mode, txBytes, txIndex) + if txMultiStore != nil { + ctx = ctx.WithMultiStore(txMultiStore) + } ms := ctx.MultiStore() // only run the tx if there is block gas remaining @@ -1000,6 +1015,8 @@ func (app *BaseApp) runMsgs(ctx sdk.Context, msgs []sdk.Msg, msgsV2 []protov2.Me break } + ctx = ctx.WithMsgIndex(i) + handler := app.msgServiceRouter.Handler(msg) if handler == nil { return nil, errorsmod.Wrapf(sdkerrors.ErrUnknownRequest, "no message handler found for %T", msg) diff --git a/baseapp/genesis.go b/baseapp/genesis.go index 4a6b0082b656..e9f611772e08 100644 --- a/baseapp/genesis.go +++ b/baseapp/genesis.go @@ -13,7 +13,7 @@ var _ genesis.TxHandler = (*BaseApp)(nil) // ExecuteGenesisTx implements genesis.GenesisState from // cosmossdk.io/core/genesis to set initial state in genesis func (ba BaseApp) ExecuteGenesisTx(tx []byte) error { - res := ba.deliverTx(tx) + res := ba.deliverTx(tx, -1) if res.Code != types.CodeTypeOK { return errors.New(res.Log) diff --git a/baseapp/options.go b/baseapp/options.go index 08cbf77ee297..13f5c9afb580 100644 --- a/baseapp/options.go +++ b/baseapp/options.go @@ -122,6 +122,11 @@ func DisableBlockGasMeter() func(*BaseApp) { return func(app *BaseApp) { app.SetDisableBlockGasMeter(true) } } +// SetTxExecutor sets a custom tx executor for the BaseApp, usually for parallel execution. +func SetTxExecutor(executor TxExecutor) func(*BaseApp) { + return func(app *BaseApp) { app.txExecutor = executor } +} + func (app *BaseApp) SetName(name string) { if app.sealed { panic("SetName() on sealed BaseApp") @@ -372,3 +377,8 @@ func (app *BaseApp) SetStreamingManager(manager storetypes.StreamingManager) { func (app *BaseApp) SetDisableBlockGasMeter(disableBlockGasMeter bool) { app.disableBlockGasMeter = disableBlockGasMeter } + +// SetTxExecutor sets a custom tx executor for the BaseApp, usually for parallel execution. +func (app *BaseApp) SetTxExecutor(executor TxExecutor) { + app.txExecutor = executor +} diff --git a/baseapp/test_helpers.go b/baseapp/test_helpers.go index db603f2f2982..53fd891c4823 100644 --- a/baseapp/test_helpers.go +++ b/baseapp/test_helpers.go @@ -70,9 +70,9 @@ func (app *BaseApp) NewUncachedContext(isCheckTx bool, header cmtproto.Header) s } func (app *BaseApp) GetContextForFinalizeBlock(txBytes []byte) sdk.Context { - return app.getContextForTx(execModeFinalize, txBytes) + return app.getContextForTx(execModeFinalize, txBytes, -1) } func (app *BaseApp) GetContextForCheckTx(txBytes []byte) sdk.Context { - return app.getContextForTx(execModeCheck, txBytes) + return app.getContextForTx(execModeCheck, txBytes, -1) } diff --git a/baseapp/txexecutor.go b/baseapp/txexecutor.go new file mode 100644 index 000000000000..230250ce9992 --- /dev/null +++ b/baseapp/txexecutor.go @@ -0,0 +1,16 @@ +package baseapp + +import ( + "context" + + abci "github.com/cometbft/cometbft/abci/types" + + "cosmossdk.io/store/types" +) + +type TxExecutor func( + ctx context.Context, + blockSize int, + cms types.MultiStore, + deliverTxWithMultiStore func(int, types.MultiStore) *abci.ExecTxResult, +) ([]*abci.ExecTxResult, error) diff --git a/types/context.go b/types/context.go index c07689e5b12b..36c5ccdc8094 100644 --- a/types/context.go +++ b/types/context.go @@ -64,6 +64,15 @@ type Context struct { streamingManager storetypes.StreamingManager cometInfo comet.BlockInfo headerInfo header.Info + + // the index of the current tx in the block, -1 means not in finalize block context + txIndex int + // the index of the current msg in the tx, -1 means not in finalize block context + msgIndex int + // the total number of transactions in current block + txCount int + // sum the gas used by all the transactions in the current block, only accessible by end blocker + blockGasUsed uint64 } // Proposed rename, not done to avoid API breakage @@ -91,6 +100,10 @@ func (c Context) TransientKVGasConfig() storetypes.GasConfig { return c.trans func (c Context) StreamingManager() storetypes.StreamingManager { return c.streamingManager } func (c Context) CometInfo() comet.BlockInfo { return c.cometInfo } func (c Context) HeaderInfo() header.Info { return c.headerInfo } +func (c Context) TxIndex() int { return c.txIndex } +func (c Context) MsgIndex() int { return c.msgIndex } +func (c Context) TxCount() int { return c.txCount } +func (c Context) BlockGasUsed() uint64 { return c.blockGasUsed } // clone the header before returning func (c Context) BlockHeader() cmtproto.Header { @@ -137,6 +150,8 @@ func NewContext(ms storetypes.MultiStore, header cmtproto.Header, isCheckTx bool eventManager: NewEventManager(), kvGasConfig: storetypes.KVGasConfig(), transientKVGasConfig: storetypes.TransientGasConfig(), + txIndex: -1, + msgIndex: -1, } } @@ -310,6 +325,26 @@ func (c Context) WithHeaderInfo(headerInfo header.Info) Context { return c } +func (c Context) WithTxIndex(txIndex int) Context { + c.txIndex = txIndex + return c +} + +func (c Context) WithTxCount(txCount int) Context { + c.txCount = txCount + return c +} + +func (c Context) WithMsgIndex(msgIndex int) Context { + c.msgIndex = msgIndex + return c +} + +func (c Context) WithBlockGasUsed(gasUsed uint64) Context { + c.blockGasUsed = gasUsed + return c +} + // TODO: remove??? func (c Context) IsZero() bool { return c.ms == nil