Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Problem: parallel tx execution is not supported #205

Merged
merged 1 commit into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ linters:
- exportloopref
- goconst
- gocritic
- gci
- gofumpt
- gosec
- gosimple
Expand Down Expand Up @@ -63,14 +62,6 @@ issues:
max-same-issues: 10000

linters-settings:
gci:
custom-order: true
sections:
- standard # Standard section: captures all standard packages.
- default # Default section: contains all imports that could not be matched to another section type.
- prefix(cosmossdk.io)
- prefix(github.com/cosmos/cosmos-sdk)

gosec:
# To select a subset of rules to run.
# Available rules: https://github.com/securego/gosec#available-rules
Expand Down
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,13 @@ Ref: https://keepachangelog.com/en/1.0.0/

## [Unreleased]

## Bug Fixes
### Features

* (baseapp) [#205](https://github.com/crypto-org-chain/cosmos-sdk/pull/205) Add `TxExecutor` baseapp option, add `TxIndex`/`TxCount`/`MsgIndex`/`BlockGasUsed` fields to `Context, to support tx parallel execution.

## [Unreleased-Upstream]

### Bug Fixes

* (crypto) [#19691](https://github.com/cosmos/cosmos-sdk/pull/19745) Fix tx sign doesn't throw an error when incorrect Ledger is used.

Expand Down
91 changes: 58 additions & 33 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,11 +336,11 @@ func (app *BaseApp) ApplySnapshotChunk(req *abci.RequestApplySnapshotChunk) (*ab
func (app *BaseApp) CheckTx(req *abci.RequestCheckTx) (*abci.ResponseCheckTx, error) {
var mode execMode

switch {
case req.Type == abci.CheckTxType_New:
switch req.Type {
case abci.CheckTxType_New:
mode = execModeCheck

case req.Type == abci.CheckTxType_Recheck:
case abci.CheckTxType_Recheck:
mode = execModeReCheck

default:
Expand Down Expand Up @@ -775,48 +775,34 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request

// Reset the gas meter so that the AnteHandlers aren't required to
gasMeter = app.getBlockGasMeter(app.finalizeBlockState.Context())
app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithBlockGasMeter(gasMeter))
app.finalizeBlockState.SetContext(
app.finalizeBlockState.Context().
WithBlockGasMeter(gasMeter).
WithTxCount(len(req.Txs)),
)

// Iterate over all raw transactions in the proposal and attempt to execute
// them, gathering the execution results.
//
// NOTE: Not all raw transactions may adhere to the sdk.Tx interface, e.g.
// vote extensions, so skip those.
txResults := make([]*abci.ExecTxResult, 0, len(req.Txs))
for _, rawTx := range req.Txs {
var response *abci.ExecTxResult

if _, err := app.txDecoder(rawTx); err == nil {
response = app.deliverTx(rawTx)
} else {
// In the case where a transaction included in a block proposal is malformed,
// we still want to return a default response to comet. This is because comet
// expects a response for each transaction included in a block proposal.
response = sdkerrors.ResponseExecTxResultWithEvents(
sdkerrors.ErrTxDecode,
0,
0,
nil,
false,
)
}

// check after every tx if we should abort
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
// continue
}

txResults = append(txResults, response)
txResults, err := app.executeTxs(ctx, req.Txs)
if err != nil {
// usually due to canceled
return nil, err
}

if app.finalizeBlockState.ms.TracingEnabled() {
app.finalizeBlockState.ms = app.finalizeBlockState.ms.SetTracingContext(nil).(storetypes.CacheMultiStore)
}

endBlock, err := app.endBlock(app.finalizeBlockState.Context())
var blockGasUsed uint64
for _, res := range txResults {
blockGasUsed += uint64(res.GasUsed)
}
sdkCtx := app.finalizeBlockState.Context().WithBlockGasUsed(blockGasUsed)

endBlock, err := app.endBlock(sdkCtx)
if err != nil {
return nil, err
}
Expand All @@ -840,6 +826,45 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request
}, nil
}

func (app *BaseApp) executeTxs(ctx context.Context, txs [][]byte) ([]*abci.ExecTxResult, error) {
if app.txExecutor != nil {
return app.txExecutor(ctx, len(txs), app.finalizeBlockState.ms, func(i int, ms storetypes.MultiStore) *abci.ExecTxResult {
return app.deliverTxWithMultiStore(txs[i], i, ms)
})
}

txResults := make([]*abci.ExecTxResult, 0, len(txs))
for i, rawTx := range txs {
var response *abci.ExecTxResult

if _, err := app.txDecoder(rawTx); err == nil {
response = app.deliverTx(rawTx, i)
} else {
// In the case where a transaction included in a block proposal is malformed,
// we still want to return a default response to comet. This is because comet
// expects a response for each transaction included in a block proposal.
response = sdkerrors.ResponseExecTxResultWithEvents(
sdkerrors.ErrTxDecode,
0,
0,
nil,
false,
)
}

// check after every tx if we should abort
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
// continue
}

txResults = append(txResults, response)
}
return txResults, nil
}

// FinalizeBlock will execute the block proposal provided by RequestFinalizeBlock.
// Specifically, it will execute an application's BeginBlock (if defined), followed
// by the transactions in the proposal, finally followed by the application's
Expand Down
27 changes: 22 additions & 5 deletions baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,9 @@ type BaseApp struct {
//
// SAFETY: it's safe to do if validators validate the total gas wanted in the `ProcessProposal`, which is the case in the default handler.
disableBlockGasMeter bool

// Optional alternative tx executor, used for block-stm parallel transaction execution.
txExecutor TxExecutor
}

// NewBaseApp returns a reference to an initialized BaseApp. It accepts a
Expand Down Expand Up @@ -659,13 +662,14 @@ func (app *BaseApp) getBlockGasMeter(ctx sdk.Context) storetypes.GasMeter {
}

// retrieve the context for the tx w/ txBytes and other memoized values.
func (app *BaseApp) getContextForTx(mode execMode, txBytes []byte) sdk.Context {
func (app *BaseApp) getContextForTx(mode execMode, txBytes []byte, txIndex int) sdk.Context {
modeState := app.getState(mode)
if modeState == nil {
panic(fmt.Sprintf("state is nil for mode %v", mode))
}
ctx := modeState.Context().
WithTxBytes(txBytes)
WithTxBytes(txBytes).
WithTxIndex(txIndex)
// WithVoteInfos(app.voteInfos) // TODO: identify if this is needed

ctx = ctx.WithConsensusParams(app.GetConsensusParams(ctx))
Expand Down Expand Up @@ -746,7 +750,11 @@ func (app *BaseApp) beginBlock(req *abci.RequestFinalizeBlock) (sdk.BeginBlock,
return resp, nil
}

func (app *BaseApp) deliverTx(tx []byte) *abci.ExecTxResult {
func (app *BaseApp) deliverTx(tx []byte, txIndex int) *abci.ExecTxResult {
return app.deliverTxWithMultiStore(tx, txIndex, nil)
}

func (app *BaseApp) deliverTxWithMultiStore(tx []byte, txIndex int, txMultiStore storetypes.MultiStore) *abci.ExecTxResult {
gInfo := sdk.GasInfo{}
resultStr := "successful"

Expand All @@ -759,7 +767,7 @@ func (app *BaseApp) deliverTx(tx []byte) *abci.ExecTxResult {
telemetry.SetGauge(float32(gInfo.GasWanted), "tx", "gas", "wanted")
}()

gInfo, result, anteEvents, err := app.runTx(execModeFinalize, tx)
gInfo, result, anteEvents, err := app.runTxWithMultiStore(execModeFinalize, tx, txIndex, txMultiStore)
if err != nil {
resultStr = "failed"
resp = sdkerrors.ResponseExecTxResultWithEvents(
Expand Down Expand Up @@ -817,12 +825,19 @@ func (app *BaseApp) endBlock(ctx context.Context) (sdk.EndBlock, error) {
// returned if the tx does not run out of gas and if all the messages are valid
// and execute successfully. An error is returned otherwise.
func (app *BaseApp) runTx(mode execMode, txBytes []byte) (gInfo sdk.GasInfo, result *sdk.Result, anteEvents []abci.Event, err error) {
return app.runTxWithMultiStore(mode, txBytes, -1, nil)
}

func (app *BaseApp) runTxWithMultiStore(mode execMode, txBytes []byte, txIndex int, txMultiStore storetypes.MultiStore) (gInfo sdk.GasInfo, result *sdk.Result, anteEvents []abci.Event, err error) {
// NOTE: GasWanted should be returned by the AnteHandler. GasUsed is
// determined by the GasMeter. We need access to the context to get the gas
// meter, so we initialize upfront.
var gasWanted uint64

ctx := app.getContextForTx(mode, txBytes)
ctx := app.getContextForTx(mode, txBytes, txIndex)
if txMultiStore != nil {
ctx = ctx.WithMultiStore(txMultiStore)
}
ms := ctx.MultiStore()

// only run the tx if there is block gas remaining
Expand Down Expand Up @@ -1000,6 +1015,8 @@ func (app *BaseApp) runMsgs(ctx sdk.Context, msgs []sdk.Msg, msgsV2 []protov2.Me
break
}

ctx = ctx.WithMsgIndex(i)

handler := app.msgServiceRouter.Handler(msg)
if handler == nil {
return nil, errorsmod.Wrapf(sdkerrors.ErrUnknownRequest, "no message handler found for %T", msg)
Expand Down
2 changes: 1 addition & 1 deletion baseapp/genesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ var _ genesis.TxHandler = (*BaseApp)(nil)
// ExecuteGenesisTx implements genesis.GenesisState from
// cosmossdk.io/core/genesis to set initial state in genesis
func (ba BaseApp) ExecuteGenesisTx(tx []byte) error {
res := ba.deliverTx(tx)
res := ba.deliverTx(tx, -1)

if res.Code != types.CodeTypeOK {
return errors.New(res.Log)
Expand Down
10 changes: 10 additions & 0 deletions baseapp/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ func DisableBlockGasMeter() func(*BaseApp) {
return func(app *BaseApp) { app.SetDisableBlockGasMeter(true) }
}

// SetTxExecutor sets a custom tx executor for the BaseApp, usually for parallel execution.
func SetTxExecutor(executor TxExecutor) func(*BaseApp) {
return func(app *BaseApp) { app.txExecutor = executor }
}

func (app *BaseApp) SetName(name string) {
if app.sealed {
panic("SetName() on sealed BaseApp")
Expand Down Expand Up @@ -372,3 +377,8 @@ func (app *BaseApp) SetStreamingManager(manager storetypes.StreamingManager) {
func (app *BaseApp) SetDisableBlockGasMeter(disableBlockGasMeter bool) {
app.disableBlockGasMeter = disableBlockGasMeter
}

// SetTxExecutor sets a custom tx executor for the BaseApp, usually for parallel execution.
func (app *BaseApp) SetTxExecutor(executor TxExecutor) {
app.txExecutor = executor
}
4 changes: 2 additions & 2 deletions baseapp/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ func (app *BaseApp) NewUncachedContext(isCheckTx bool, header cmtproto.Header) s
}

func (app *BaseApp) GetContextForFinalizeBlock(txBytes []byte) sdk.Context {
return app.getContextForTx(execModeFinalize, txBytes)
return app.getContextForTx(execModeFinalize, txBytes, -1)
}

func (app *BaseApp) GetContextForCheckTx(txBytes []byte) sdk.Context {
return app.getContextForTx(execModeCheck, txBytes)
return app.getContextForTx(execModeCheck, txBytes, -1)
}
16 changes: 16 additions & 0 deletions baseapp/txexecutor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package baseapp

import (
"context"

abci "github.com/cometbft/cometbft/abci/types"

"cosmossdk.io/store/types"
)

type TxExecutor func(
ctx context.Context,
blockSize int,
cms types.MultiStore,
deliverTxWithMultiStore func(int, types.MultiStore) *abci.ExecTxResult,
) ([]*abci.ExecTxResult, error)
35 changes: 35 additions & 0 deletions types/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,15 @@ type Context struct {
streamingManager storetypes.StreamingManager
cometInfo comet.BlockInfo
headerInfo header.Info

// the index of the current tx in the block, -1 means not in finalize block context
txIndex int
// the index of the current msg in the tx, -1 means not in finalize block context
msgIndex int
// the total number of transactions in current block
txCount int
// sum the gas used by all the transactions in the current block, only accessible by end blocker
blockGasUsed uint64
}

// Proposed rename, not done to avoid API breakage
Expand Down Expand Up @@ -91,6 +100,10 @@ func (c Context) TransientKVGasConfig() storetypes.GasConfig { return c.trans
func (c Context) StreamingManager() storetypes.StreamingManager { return c.streamingManager }
func (c Context) CometInfo() comet.BlockInfo { return c.cometInfo }
func (c Context) HeaderInfo() header.Info { return c.headerInfo }
func (c Context) TxIndex() int { return c.txIndex }
func (c Context) MsgIndex() int { return c.msgIndex }
func (c Context) TxCount() int { return c.txCount }
func (c Context) BlockGasUsed() uint64 { return c.blockGasUsed }

// clone the header before returning
func (c Context) BlockHeader() cmtproto.Header {
Expand Down Expand Up @@ -137,6 +150,8 @@ func NewContext(ms storetypes.MultiStore, header cmtproto.Header, isCheckTx bool
eventManager: NewEventManager(),
kvGasConfig: storetypes.KVGasConfig(),
transientKVGasConfig: storetypes.TransientGasConfig(),
txIndex: -1,
msgIndex: -1,
}
}

Expand Down Expand Up @@ -310,6 +325,26 @@ func (c Context) WithHeaderInfo(headerInfo header.Info) Context {
return c
}

func (c Context) WithTxIndex(txIndex int) Context {
c.txIndex = txIndex
return c
}

func (c Context) WithTxCount(txCount int) Context {
c.txCount = txCount
return c
}

func (c Context) WithMsgIndex(msgIndex int) Context {
c.msgIndex = msgIndex
return c
}

func (c Context) WithBlockGasUsed(gasUsed uint64) Context {
c.blockGasUsed = gasUsed
return c
}

// TODO: remove???
func (c Context) IsZero() bool {
return c.ms == nil
Expand Down
Loading