Skip to content

Commit

Permalink
feat: concurrent rechecktx
Browse files Browse the repository at this point in the history
  • Loading branch information
dudong2 committed Dec 17, 2024
1 parent 2b172f4 commit 7c2cbfc
Show file tree
Hide file tree
Showing 11 changed files with 8,158 additions and 5,387 deletions.
11,633 changes: 7,007 additions & 4,626 deletions api/tendermint/abci/types.pulsar.go

Large diffs are not rendered by default.

74 changes: 74 additions & 0 deletions api/tendermint/abci/types_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

51 changes: 45 additions & 6 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,9 @@ func (app *BaseApp) ApplySnapshotChunk(req *abci.RequestApplySnapshotChunk) (*ab
// internal CheckTx state if the AnteHandler passes. Otherwise, the ResponseCheckTx
// will contain relevant error information. Regardless of tx execution outcome,
// the ResponseCheckTx will contain relevant gas execution context.
func (app *BaseApp) CheckTx(req *abci.RequestCheckTx) (*abci.ResponseCheckTx, error) {
func (app *BaseApp) CheckTxSync(req *abci.RequestCheckTx) (*abci.ResponseCheckTx, error) {
defer telemetry.MeasureSince(time.Now(), "abci", "check_tx")

var mode execMode

switch req.Type {
Expand All @@ -347,20 +349,57 @@ func (app *BaseApp) CheckTx(req *abci.RequestCheckTx) (*abci.ResponseCheckTx, er
return nil, fmt.Errorf("unknown RequestCheckTx type: %s", req.Type)
}

gInfo, result, anteEvents, err := app.runTx(mode, req.Tx)
tx, err := app.preCheckTx(req.Tx)
if err != nil {
return sdkerrors.ResponseCheckTxWithEvents(err, gInfo.GasWanted, gInfo.GasUsed, anteEvents, app.trace), nil
return sdkerrors.ResponseCheckTxWithEvents(err, 0, 0, nil, false), err
}

waits, signals := app.checkAccountWGs.Register(app.cdc, tx)

app.checkAccountWGs.Wait(waits)
defer app.checkAccountWGs.Done(signals)

gInfo, err := app.checkTx(mode, req.Tx, tx)
if err != nil {
return sdkerrors.ResponseCheckTxWithEvents(err, gInfo.GasWanted, gInfo.GasUsed, nil, false), nil
}
return &abci.ResponseCheckTx{
GasWanted: int64(gInfo.GasWanted), // TODO: Should type accept unsigned ints?
GasUsed: int64(gInfo.GasUsed), // TODO: Should type accept unsigned ints?
Log: result.Log,
Data: result.Data,
Events: sdk.MarkEventsToIndex(result.Events, app.indexEvents),
// Log: result.Log,
// Data: result.Data,
// Events: sdk.MarkEventsToIndex(result.Events, app.indexEvents),
}, nil
}

func (app *BaseApp) CheckTxAsync(req *abci.RequestCheckTx, callback abci.CheckTxCallback) {
if req.Type != abci.CheckTxType_New && req.Type != abci.CheckTxType_Recheck {
panic(fmt.Sprintf("unknown RequestCheckTx type: %s", req.Type))
}

reqCheckTx := &RequestCheckTxAsync{
txBytes: req.Tx,
txType: req.Type,
callback: callback,
prepare: waitGroup1(),
}
app.chCheckTx <- reqCheckTx

go app.prepareCheckTx(reqCheckTx)
}

// BeginRecheckTx implements the ABCI interface and set the check state based on the given header
func (app *BaseApp) BeginRecheckTx(req *abci.RequestBeginRecheckTx) (*abci.ResponseBeginRecheckTx, error) {
// NOTE: This is safe because CometBFT holds a lock on the mempool for Rechecking.
app.setState(execModeCheck, req.Header)
return &abci.ResponseBeginRecheckTx{Code: abci.CodeTypeOK}, nil
}

// EndRecheckTx implements the ABCI interface.
func (app *BaseApp) EndRecheckTx(req *abci.RequestEndRecheckTx) (*abci.ResponseEndRecheckTx, error) {
return &abci.ResponseEndRecheckTx{Code: abci.CodeTypeOK}, nil
}

// PrepareProposal implements the PrepareProposal ABCI method and returns a
// ResponsePrepareProposal object to the client. The PrepareProposal method is
// responsible for allowing the block proposer to perform application-dependent
Expand Down
10 changes: 5 additions & 5 deletions baseapp/abci_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ func TestABCI_CheckTx(t *testing.T) {
txBytes, err := suite.txConfig.TxEncoder()(tx)
require.NoError(t, err)

r, err := suite.baseApp.CheckTx(&abci.RequestCheckTx{Tx: txBytes})
r, err := suite.baseApp.CheckTxSync(&abci.RequestCheckTx{Tx: txBytes})
require.NoError(t, err)
require.True(t, r.IsOK(), fmt.Sprintf("%v", r))
require.Empty(t, r.GetEvents())
Expand Down Expand Up @@ -1413,7 +1413,7 @@ func TestABCI_Proposal_HappyPath(t *testing.T) {
Tx: txBytes,
Type: abci.CheckTxType_New,
}
_, err = suite.baseApp.CheckTx(&reqCheckTx)
_, err = suite.baseApp.CheckTxSync(&reqCheckTx)
require.NoError(t, err)

tx2 := newTxCounter(t, suite.txConfig, 1, 1)
Expand Down Expand Up @@ -1726,7 +1726,7 @@ func TestABCI_PrepareProposal_Failures(t *testing.T) {
Tx: txBytes,
Type: abci.CheckTxType_New,
}
checkTxRes, err := suite.baseApp.CheckTx(&reqCheckTx)
checkTxRes, err := suite.baseApp.CheckTxSync(&reqCheckTx)
require.NoError(t, err)
require.True(t, checkTxRes.IsOK())

Expand Down Expand Up @@ -2428,7 +2428,7 @@ func TestABCI_Proposal_FailReCheckTx(t *testing.T) {
Tx: txBytes,
Type: abci.CheckTxType_New,
}
_, err = suite.baseApp.CheckTx(&reqCheckTx)
_, err = suite.baseApp.CheckTxSync(&reqCheckTx)
require.NoError(t, err)

tx2 := newTxCounter(t, suite.txConfig, 1, 1)
Expand All @@ -2455,7 +2455,7 @@ func TestABCI_Proposal_FailReCheckTx(t *testing.T) {
Tx: txBytes,
Type: abci.CheckTxType_Recheck,
}
resp, err := suite.baseApp.CheckTx(&reqReCheckTx)
resp, err := suite.baseApp.CheckTxSync(&reqReCheckTx)
require.NoError(t, err)
require.True(t, resp.IsErr())
require.Equal(t, "recheck failed in ante handler", resp.Log)
Expand Down
91 changes: 91 additions & 0 deletions baseapp/accountwgs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package baseapp

import (
"sync"

"github.com/cosmos/cosmos-sdk/codec"
sdk "github.com/cosmos/cosmos-sdk/types"
)

type AccountWGs struct {
mtx sync.Mutex
wgs map[string]*sync.WaitGroup
}

func NewAccountWGs() *AccountWGs {
return &AccountWGs{
wgs: make(map[string]*sync.WaitGroup),
}
}

func (aw *AccountWGs) Register(cdc codec.Codec, tx sdk.Tx) (waits []*sync.WaitGroup, signals []*AccountWG) {
signers := getUniqSigners(cdc, tx)

aw.mtx.Lock()
defer aw.mtx.Unlock()
for _, signer := range signers {
if wg := aw.wgs[signer]; wg != nil {
waits = append(waits, wg)
}
sig := waitGroup1()
aw.wgs[signer] = sig
signals = append(signals, NewAccountWG(signer, sig))
}

return waits, signals
}

func (aw *AccountWGs) Wait(waits []*sync.WaitGroup) {
for _, wait := range waits {
wait.Wait()
}
}

func (aw *AccountWGs) Done(signals []*AccountWG) {
aw.mtx.Lock()
defer aw.mtx.Unlock()

for _, signal := range signals {
signal.wg.Done()
if aw.wgs[signal.acc] == signal.wg {
delete(aw.wgs, signal.acc)
}
}
}

func getUniqSigners(cdc codec.Codec, tx sdk.Tx) (signers []string) {
seen := map[string]bool{}
for _, msg := range tx.GetMsgs() {
msgSigners, _, err := cdc.GetMsgV1Signers(msg)
if err != nil {
// ! need to handle error?
}

for _, msgSigner := range msgSigners {
addr := sdk.AccAddress(msgSigner).String()
if !seen[addr] {
signers = append(signers, addr)
seen[addr] = true
}
}
}
return signers
}

type AccountWG struct {
acc string
wg *sync.WaitGroup
}

func NewAccountWG(acc string, wg *sync.WaitGroup) *AccountWG {
return &AccountWG{
acc: acc,
wg: wg,
}
}

func waitGroup1() (wg *sync.WaitGroup) {
wg = &sync.WaitGroup{}
wg.Add(1)
return wg
}
Loading

0 comments on commit 7c2cbfc

Please sign in to comment.