diff --git a/baseapp/abci.go b/baseapp/abci.go index d833061b9997..e4079bb690ed 100644 --- a/baseapp/abci.go +++ b/baseapp/abci.go @@ -366,9 +366,6 @@ func (app *BaseApp) CheckTxSync(req *abci.RequestCheckTx) (*abci.ResponseCheckTx return &abci.ResponseCheckTx{ GasWanted: int64(gInfo.GasWanted), // TODO: Should type accept unsigned ints? GasUsed: int64(gInfo.GasUsed), // TODO: Should type accept unsigned ints? - // Log: result.Log, - // Data: result.Data, - // Events: sdk.MarkEventsToIndex(result.Events, app.indexEvents), }, nil } diff --git a/baseapp/accountwgs.go b/baseapp/accountwgs.go index 648d98c95877..ff16a65ce16e 100644 --- a/baseapp/accountwgs.go +++ b/baseapp/accountwgs.go @@ -18,15 +18,29 @@ func NewAccountWGs() *AccountWGs { } } +// Register registers transaction signers and returns necessary WaitGroups. +// It ensures that transactions from the same signer are processed sequentially. +// +// Parameters: +// - cdc: Codec for decoding transaction messages +// - tx: Transaction to be registered +// +// Returns: +// - waits: WaitGroups that need to be waited on (from previous transactions of the same signers) +// - signals: WaitGroups that should be signaled when this transaction completes func (aw *AccountWGs) Register(cdc codec.Codec, tx sdk.Tx) (waits []*sync.WaitGroup, signals []*AccountWG) { + // Get unique signers from the tx signers := getUniqSigners(cdc, tx) aw.mtx.Lock() defer aw.mtx.Unlock() for _, signer := range signers { + // If there's an existing WaitGroup for this signer, + // we need to wait for it before processing the current transaction if wg := aw.wgs[signer]; wg != nil { waits = append(waits, wg) } + // Create a new WaitGroup for the current transaction sig := waitGroup1() aw.wgs[signer] = sig signals = append(signals, NewAccountWG(signer, sig)) diff --git a/baseapp/baseapp.go b/baseapp/baseapp.go index e755e834d226..9dbd2f8d257d 100644 --- a/baseapp/baseapp.go +++ b/baseapp/baseapp.go @@ -520,6 +520,11 @@ func (app *BaseApp) setState(mode execMode, h cmtproto.Header) { switch mode { case execModeCheck: + // Acquire a lock to ensure thread-safe access to `checkState` during state updates. + // Since `CheckTxAsync` runs checkTx concurrently, this lock is necessary to + // prevent data races and ensure that updates to `checkState` remain consistent + // across all goroutines. Without this lock, concurrent modifications could + // lead to inconsistent application states or runtime errors. app.checkStateMtx.Lock() defer app.checkStateMtx.Unlock() baseState.SetContext(baseState.Context().WithIsCheckTx(true).WithMinGasPrices(app.minGasPrices)) @@ -690,6 +695,10 @@ func (app *BaseApp) getBlockGasMeter(ctx sdk.Context) storetypes.GasMeter { // retrieve the context for the tx w/ txBytes and other memoized values. func (app *BaseApp) getContextForTx(mode execMode, txBytes []byte, txIndex int) sdk.Context { if mode == execModeCheck || mode == execModeReCheck { + // By introducing Parallel CheckTx, now getContextForTx can be called by multiple goroutines. + // Without this lock, multiple goroutines could attempt to modify or read `checkState` simultaneously, + // leading to inconsistent or corrupted state. + // The lock guarantees a consistent context is maintained across all CheckTx operations. app.checkStateMtx.Lock() defer app.checkStateMtx.Unlock() }