Skip to content

Commit

Permalink
[STAB-31] Allow AnteHandler to control locking semantics.
Browse files Browse the repository at this point in the history
Updated existing usages of AnteHandler in cosmos to use the instance which always locks restoring the existing contract.
  • Loading branch information
lcwik committed Jan 31, 2024
1 parent ac7a054 commit 50ac18b
Show file tree
Hide file tree
Showing 8 changed files with 292 additions and 102 deletions.
155 changes: 87 additions & 68 deletions baseapp/abci_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,13 @@ func TestABCI_FinalizeBlock_DeliverTx(t *testing.T) {
}
}

func wrapWithLockAndCacheContextDecorator(handler sdk.AnteHandler) sdk.AnteHandler {
lockingHandler := baseapp.NewLockAndCacheContextAnteDecorator()
return func(ctx sdk.Context, tx sdk.Tx, simulate bool) (newCtx sdk.Context, err error) {
return lockingHandler.AnteHandle(ctx, tx, simulate, handler)
}
}

func TestABCI_FinalizeBlock_MultiMsg(t *testing.T) {
anteKey := []byte("ante-key")
anteOpt := func(bapp *baseapp.BaseApp) { bapp.SetAnteHandler(anteHandlerTxTest(t, capKey1, anteKey)) }
Expand Down Expand Up @@ -735,10 +742,12 @@ func TestABCI_FinalizeBlock_MultiMsg(t *testing.T) {
func TestABCI_Query_SimulateTx(t *testing.T) {
gasConsumed := uint64(5)
anteOpt := func(bapp *baseapp.BaseApp) {
bapp.SetAnteHandler(func(ctx sdk.Context, tx sdk.Tx, simulate bool) (newCtx sdk.Context, err error) {
newCtx = ctx.WithGasMeter(storetypes.NewGasMeter(gasConsumed))
return
})
bapp.SetAnteHandler(wrapWithLockAndCacheContextDecorator(
func(ctx sdk.Context, tx sdk.Tx, simulate bool) (newCtx sdk.Context, err error) {
newCtx = ctx.WithGasMeter(storetypes.NewGasMeter(gasConsumed))
return
}),
)
}
suite := NewBaseAppSuite(t, anteOpt)

Expand Down Expand Up @@ -796,9 +805,11 @@ func TestABCI_Query_SimulateTx(t *testing.T) {

func TestABCI_InvalidTransaction(t *testing.T) {
anteOpt := func(bapp *baseapp.BaseApp) {
bapp.SetAnteHandler(func(ctx sdk.Context, tx sdk.Tx, simulate bool) (newCtx sdk.Context, err error) {
return
})
bapp.SetAnteHandler(wrapWithLockAndCacheContextDecorator(
func(ctx sdk.Context, tx sdk.Tx, simulate bool) (newCtx sdk.Context, err error) {
return
}),
)
}

suite := NewBaseAppSuite(t, anteOpt)
Expand Down Expand Up @@ -922,29 +933,31 @@ func TestABCI_InvalidTransaction(t *testing.T) {
func TestABCI_TxGasLimits(t *testing.T) {
gasGranted := uint64(10)
anteOpt := func(bapp *baseapp.BaseApp) {
bapp.SetAnteHandler(func(ctx sdk.Context, tx sdk.Tx, simulate bool) (newCtx sdk.Context, err error) {
newCtx = ctx.WithGasMeter(storetypes.NewGasMeter(gasGranted))

// AnteHandlers must have their own defer/recover in order for the BaseApp
// to know how much gas was used! This is because the GasMeter is created in
// the AnteHandler, but if it panics the context won't be set properly in
// runTx's recover call.
defer func() {
if r := recover(); r != nil {
switch rType := r.(type) {
case storetypes.ErrorOutOfGas:
err = errorsmod.Wrapf(sdkerrors.ErrOutOfGas, "out of gas in location: %v", rType.Descriptor)
default:
panic(r)
bapp.SetAnteHandler(wrapWithLockAndCacheContextDecorator(
func(ctx sdk.Context, tx sdk.Tx, simulate bool) (newCtx sdk.Context, err error) {
newCtx = ctx.WithGasMeter(storetypes.NewGasMeter(gasGranted))

// AnteHandlers must have their own defer/recover in order for the BaseApp
// to know how much gas was used! This is because the GasMeter is created in
// the AnteHandler, but if it panics the context won't be set properly in
// runTx's recover call.
defer func() {
if r := recover(); r != nil {
switch rType := r.(type) {
case storetypes.ErrorOutOfGas:
err = errorsmod.Wrapf(sdkerrors.ErrOutOfGas, "out of gas in location: %v", rType.Descriptor)
default:
panic(r)
}
}
}
}()
}()

count, _ := parseTxMemo(t, tx)
newCtx.GasMeter().ConsumeGas(uint64(count), "counter-ante")
count, _ := parseTxMemo(t, tx)
newCtx.GasMeter().ConsumeGas(uint64(count), "counter-ante")

return newCtx, nil
})
return newCtx, nil
}),
)
}

suite := NewBaseAppSuite(t, anteOpt)
Expand Down Expand Up @@ -1018,25 +1031,27 @@ func TestABCI_TxGasLimits(t *testing.T) {
func TestABCI_MaxBlockGasLimits(t *testing.T) {
gasGranted := uint64(10)
anteOpt := func(bapp *baseapp.BaseApp) {
bapp.SetAnteHandler(func(ctx sdk.Context, tx sdk.Tx, simulate bool) (newCtx sdk.Context, err error) {
newCtx = ctx.WithGasMeter(storetypes.NewGasMeter(gasGranted))

defer func() {
if r := recover(); r != nil {
switch rType := r.(type) {
case storetypes.ErrorOutOfGas:
err = errorsmod.Wrapf(sdkerrors.ErrOutOfGas, "out of gas in location: %v", rType.Descriptor)
default:
panic(r)
bapp.SetAnteHandler(wrapWithLockAndCacheContextDecorator(
func(ctx sdk.Context, tx sdk.Tx, simulate bool) (newCtx sdk.Context, err error) {
newCtx = ctx.WithGasMeter(storetypes.NewGasMeter(gasGranted))

defer func() {
if r := recover(); r != nil {
switch rType := r.(type) {
case storetypes.ErrorOutOfGas:
err = errorsmod.Wrapf(sdkerrors.ErrOutOfGas, "out of gas in location: %v", rType.Descriptor)
default:
panic(r)
}
}
}
}()
}()

count, _ := parseTxMemo(t, tx)
newCtx.GasMeter().ConsumeGas(uint64(count), "counter-ante")
count, _ := parseTxMemo(t, tx)
newCtx.GasMeter().ConsumeGas(uint64(count), "counter-ante")

return
})
return
}),
)
}

suite := NewBaseAppSuite(t, anteOpt)
Expand Down Expand Up @@ -1115,29 +1130,31 @@ func TestABCI_MaxBlockGasLimits(t *testing.T) {
func TestABCI_GasConsumptionBadTx(t *testing.T) {
gasWanted := uint64(5)
anteOpt := func(bapp *baseapp.BaseApp) {
bapp.SetAnteHandler(func(ctx sdk.Context, tx sdk.Tx, simulate bool) (newCtx sdk.Context, err error) {
newCtx = ctx.WithGasMeter(storetypes.NewGasMeter(gasWanted))

defer func() {
if r := recover(); r != nil {
switch rType := r.(type) {
case storetypes.ErrorOutOfGas:
log := fmt.Sprintf("out of gas in location: %v", rType.Descriptor)
err = errorsmod.Wrap(sdkerrors.ErrOutOfGas, log)
default:
panic(r)
bapp.SetAnteHandler(wrapWithLockAndCacheContextDecorator(
func(ctx sdk.Context, tx sdk.Tx, simulate bool) (newCtx sdk.Context, err error) {
newCtx = ctx.WithGasMeter(storetypes.NewGasMeter(gasWanted))

defer func() {
if r := recover(); r != nil {
switch rType := r.(type) {
case storetypes.ErrorOutOfGas:
log := fmt.Sprintf("out of gas in location: %v", rType.Descriptor)
err = errorsmod.Wrap(sdkerrors.ErrOutOfGas, log)
default:
panic(r)
}
}
}
}()
}()

counter, failOnAnte := parseTxMemo(t, tx)
newCtx.GasMeter().ConsumeGas(uint64(counter), "counter-ante")
if failOnAnte {
return newCtx, errorsmod.Wrap(sdkerrors.ErrUnauthorized, "ante handler failure")
}
counter, failOnAnte := parseTxMemo(t, tx)
newCtx.GasMeter().ConsumeGas(uint64(counter), "counter-ante")
if failOnAnte {
return newCtx, errorsmod.Wrap(sdkerrors.ErrUnauthorized, "ante handler failure")
}

return
})
return
}),
)
}

suite := NewBaseAppSuite(t, anteOpt)
Expand Down Expand Up @@ -1172,11 +1189,13 @@ func TestABCI_GasConsumptionBadTx(t *testing.T) {
func TestABCI_Query(t *testing.T) {
key, value := []byte("hello"), []byte("goodbye")
anteOpt := func(bapp *baseapp.BaseApp) {
bapp.SetAnteHandler(func(ctx sdk.Context, tx sdk.Tx, simulate bool) (newCtx sdk.Context, err error) {
store := ctx.KVStore(capKey1)
store.Set(key, value)
return
})
bapp.SetAnteHandler(wrapWithLockAndCacheContextDecorator(
func(ctx sdk.Context, tx sdk.Tx, simulate bool) (newCtx sdk.Context, err error) {
store := ctx.KVStore(capKey1)
store.Set(key, value)
return
}),
)
}

suite := NewBaseAppSuite(t, anteOpt)
Expand Down
42 changes: 19 additions & 23 deletions baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ type BaseApp struct {
optimisticExec *oe.OptimisticExecution

// Used to synchronize the application when using an unsynchronized ABCI++ client.
mtx sync.Mutex
mtx sync.RWMutex
}

// NewBaseApp returns a reference to an initialized BaseApp. It accepts a
Expand Down Expand Up @@ -653,6 +653,11 @@ func (app *BaseApp) getContextForTx(mode execMode, txBytes []byte) sdk.Context {
WithTxBytes(txBytes)
// WithVoteInfos(app.voteInfos) // TODO: identify if this is needed

// Use a new gas meter since loading the consensus params below consumes gas and causes a race condition.
//
// TODO(STAB-35): See if this can be removed.
ctx = ctx.WithGasMeter(storetypes.NewInfiniteGasMeter())

ctx = ctx.WithConsensusParams(app.GetConsensusParams(ctx))

if mode == execModeReCheck {
Expand All @@ -668,7 +673,7 @@ func (app *BaseApp) getContextForTx(mode execMode, txBytes []byte) sdk.Context {

// cacheTxContext returns a new context based off of the provided context with
// a branched multi-store.
func (app *BaseApp) cacheTxContext(ctx sdk.Context, txBytes []byte) (sdk.Context, storetypes.CacheMultiStore) {
func cacheTxContext(ctx sdk.Context, txBytes []byte) (sdk.Context, storetypes.CacheMultiStore) {
ms := ctx.MultiStore()
// TODO: https://github.com/cosmos/cosmos-sdk/issues/2824
msCache := ms.CacheMultiStore()
Expand Down Expand Up @@ -842,8 +847,8 @@ func (app *BaseApp) runCheckTxConcurrently(mode execMode, txBytes []byte) (gInfo
// embedded here to ensure that the lifetime of the mutex is limited to only this function allowing
// for the return values to be computed without holding the lock.
func() {
app.mtx.Lock()
defer app.mtx.Unlock()
app.mtx.RLock()
defer app.mtx.RUnlock()

ctx := app.getContextForTx(mode, txBytes)
ms := ctx.MultiStore()
Expand All @@ -859,25 +864,17 @@ func (app *BaseApp) runCheckTxConcurrently(mode execMode, txBytes []byte) (gInfo
}()

if app.anteHandler != nil {
var (
anteCtx sdk.Context
msCache storetypes.CacheMultiStore
newCtx sdk.Context
)
var newCtx sdk.Context

// Branch context before AnteHandler call in case it aborts.
// This is required for both CheckTx and DeliverTx.
// Ref: https://github.com/cosmos/cosmos-sdk/issues/2772
//
// NOTE: Alternatively, we could require that AnteHandler ensures that
// writes do not happen if aborted/failed. This may have some
// performance benefits, but it'll be more difficult to get right.
anteCtx, msCache = app.cacheTxContext(ctx, txBytes)
anteCtx = anteCtx.WithEventManager(sdk.NewEventManager())
newCtx, err = app.anteHandler(anteCtx, tx, mode == execModeSimulate)
// Typically the Cosmos SDK branches the context before passing it to the ante handler but here we
// allow the application's AnteHandler to control branching semantics for the context itself.
// We also guarantee that the passed in context is held with a read lock allowing for concurrent
// execution.
anteCtx := ctx.WithEventManager(sdk.NewEventManager())
newCtx, err = app.anteHandler(anteCtx, tx, false /* mode == execModeSimulate */)

if !newCtx.IsZero() {
// At this point, newCtx.MultiStore() is a store branch, or something else
// At this point, ctx.MultiStore() is a store branch, or something else
// replaced by the AnteHandler. We want the original multistore.
//
// Also, in the case of the tx aborting, we need to track gas consumed via
Expand All @@ -899,7 +896,6 @@ func (app *BaseApp) runCheckTxConcurrently(mode execMode, txBytes []byte) (gInfo
return
}

msCache.Write()
anteEvents = events.ToABCIEvents()
}

Expand Down Expand Up @@ -1022,7 +1018,7 @@ func (app *BaseApp) runTx(mode execMode, txBytes []byte) (gInfo sdk.GasInfo, res
// NOTE: Alternatively, we could require that AnteHandler ensures that
// writes do not happen if aborted/failed. This may have some
// performance benefits, but it'll be more difficult to get right.
anteCtx, msCache = app.cacheTxContext(ctx, txBytes)
anteCtx, msCache = cacheTxContext(ctx, txBytes)
anteCtx = anteCtx.WithEventManager(sdk.NewEventManager())
newCtx, err := app.anteHandler(anteCtx, tx, mode == execModeSimulate)

Expand Down Expand Up @@ -1065,7 +1061,7 @@ func (app *BaseApp) runTx(mode execMode, txBytes []byte) (gInfo sdk.GasInfo, res
// Create a new Context based off of the existing Context with a MultiStore branch
// in case message processing fails. At this point, the MultiStore
// is a branch of a branch.
runMsgCtx, msCache := app.cacheTxContext(ctx, txBytes)
runMsgCtx, msCache := cacheTxContext(ctx, txBytes)

// Attempt to execute all messages and only update state if all messages pass
// and we're in DeliverTx. Note, runMsgs will never return a reference to a
Expand Down
22 changes: 13 additions & 9 deletions baseapp/baseapp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,12 +197,14 @@ func NewBaseAppSuiteWithSnapshots(t *testing.T, cfg SnapshotsConfig, opts ...fun
func TestAnteHandlerGasMeter(t *testing.T) {
// run BeginBlock and assert that the gas meter passed into the first Txn is zeroed out
anteOpt := func(bapp *baseapp.BaseApp) {
bapp.SetAnteHandler(func(ctx sdk.Context, tx sdk.Tx, simulate bool) (newCtx sdk.Context, err error) {
gasMeter := ctx.BlockGasMeter()
require.NotNil(t, gasMeter)
require.Equal(t, storetypes.Gas(0), gasMeter.GasConsumed())
return ctx, nil
})
bapp.SetAnteHandler(wrapWithLockAndCacheContextDecorator(
func(ctx sdk.Context, tx sdk.Tx, simulate bool) (newCtx sdk.Context, err error) {
gasMeter := ctx.BlockGasMeter()
require.NotNil(t, gasMeter)
require.Equal(t, storetypes.Gas(0), gasMeter.GasConsumed())
return ctx, nil
}),
)
}
// set the beginBlocker to use some gas
beginBlockerOpt := func(bapp *baseapp.BaseApp) {
Expand Down Expand Up @@ -516,9 +518,11 @@ func TestCustomRunTxPanicHandler(t *testing.T) {
customPanicMsg := "test panic"
anteErr := errorsmod.Register("fakeModule", 100500, "fakeError")
anteOpt := func(bapp *baseapp.BaseApp) {
bapp.SetAnteHandler(func(ctx sdk.Context, tx sdk.Tx, simulate bool) (newCtx sdk.Context, err error) {
panic(errorsmod.Wrap(anteErr, "anteHandler"))
})
bapp.SetAnteHandler(wrapWithLockAndCacheContextDecorator(
func(ctx sdk.Context, tx sdk.Tx, simulate bool) (newCtx sdk.Context, err error) {
panic(errorsmod.Wrap(anteErr, "anteHandler"))
}),
)
}

suite := NewBaseAppSuite(t, anteOpt)
Expand Down
34 changes: 34 additions & 0 deletions baseapp/locking.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package baseapp

import (
"sync"

storetypes "cosmossdk.io/store/types"

sdk "github.com/cosmos/cosmos-sdk/types"
)

var _ sdk.AnteDecorator = lockAndCacheContextDecorator{}

func NewLockAndCacheContextAnteDecorator() sdk.AnteDecorator {
return lockAndCacheContextDecorator{
mtx: &sync.Mutex{},
}
}

type lockAndCacheContextDecorator struct {
mtx *sync.Mutex
}

func (l lockAndCacheContextDecorator) AnteHandle(ctx sdk.Context, tx sdk.Tx, simulate bool, next sdk.AnteHandler) (sdk.Context, error) {
l.mtx.Lock()
defer l.mtx.Unlock()

var cacheMs storetypes.CacheMultiStore
ctx, cacheMs = cacheTxContext(ctx, ctx.TxBytes())
newCtx, err := next(ctx, tx, simulate)
if err == nil {
cacheMs.Write()
}
return newCtx, err
}
Loading

0 comments on commit 50ac18b

Please sign in to comment.