From f65b20ea1f6419546ef5c613c23f12d37f025846 Mon Sep 17 00:00:00 2001 From: erick yan <46879318+erickyan86@users.noreply.github.com> Date: Mon, 8 Jul 2019 11:45:39 +0800 Subject: [PATCH] move TxPool reorg and events to background goroutine (#378) * move TxPool reorg and events to background goroutine * modify txpool handler test --- blockchain/genesis.go | 5 +- rpcapi/p2p.go | 14 +- txpool/accountset.go | 36 +- txpool/config.go | 4 + txpool/handler_test.go | 4 +- txpool/{addrheartbeat.go => nameheartbeat.go} | 0 txpool/pricedlsit.go | 4 +- txpool/test_utils.go | 1 + txpool/txpool.go | 705 +++++++++++------- txpool/txpool_test.go | 188 ++--- 10 files changed, 519 insertions(+), 442 deletions(-) rename txpool/{addrheartbeat.go => nameheartbeat.go} (100%) diff --git a/blockchain/genesis.go b/blockchain/genesis.go index 46d65fc9..9c687283 100644 --- a/blockchain/genesis.go +++ b/blockchain/genesis.go @@ -24,17 +24,16 @@ import ( "strings" "time" - "github.com/fractalplatform/fractal/consensus/dpos" - "github.com/fractalplatform/fractal/snapshot" - "github.com/ethereum/go-ethereum/log" am "github.com/fractalplatform/fractal/accountmanager" at "github.com/fractalplatform/fractal/asset" "github.com/fractalplatform/fractal/common" + "github.com/fractalplatform/fractal/consensus/dpos" fm "github.com/fractalplatform/fractal/feemanager" "github.com/fractalplatform/fractal/p2p/enode" "github.com/fractalplatform/fractal/params" "github.com/fractalplatform/fractal/rawdb" + "github.com/fractalplatform/fractal/snapshot" "github.com/fractalplatform/fractal/state" "github.com/fractalplatform/fractal/types" "github.com/fractalplatform/fractal/utils/fdb" diff --git a/rpcapi/p2p.go b/rpcapi/p2p.go index ea0d7dad..3537731e 100644 --- a/rpcapi/p2p.go +++ b/rpcapi/p2p.go @@ -132,13 +132,19 @@ func (api *PrivateP2pAPI) BadNodes() []string { } // AddBadNode add a bad node -func (api *PrivateP2pAPI) AddBadNode(url string) error { - return api.b.AddBadNode(url) +func (api *PrivateP2pAPI) AddBadNode(url string) (bool, error) { + if err := api.b.AddBadNode(url); err != nil { + return false, fmt.Errorf("invalid enode: %v", err) + } + return true, nil } // RemoveBadNode remove a bad node -func (api *PrivateP2pAPI) RemoveBadNode(url string) error { - return api.b.RemoveBadNode(url) +func (api *PrivateP2pAPI) RemoveBadNode(url string) (bool, error) { + if err := api.b.RemoveBadNode(url); err != nil { + return false, fmt.Errorf("invalid enode: %v", err) + } + return true, nil } // SelfNode return self enode url diff --git a/txpool/accountset.go b/txpool/accountset.go index 70747917..1360ce23 100644 --- a/txpool/accountset.go +++ b/txpool/accountset.go @@ -24,14 +24,22 @@ import ( // accountSet is simply a set of name to check for existence type accountSet struct { accounts map[common.Name]struct{} + cache *[]common.Name } // newAccountSet creates a new name set with an associated signer for sender // derivations. -func newAccountSet(signer types.Signer) *accountSet { - return &accountSet{ - accounts: make(map[common.Name]struct{}), +func newAccountSet(signer types.Signer, names ...common.Name) *accountSet { + as := &accountSet{accounts: make(map[common.Name]struct{})} + for _, name := range names { + as.add(name) } + return as +} + +// addTx adds the sender of tx into the set. +func (as *accountSet) addTx(tx *types.Transaction) { + as.add(tx.GetActions()[0].Sender()) } // contains checks if a given name is contained within the set. @@ -49,4 +57,26 @@ func (as *accountSet) containsName(tx *types.Transaction) bool { // add inserts a new name into the set to track. func (as *accountSet) add(name common.Name) { as.accounts[name] = struct{}{} + as.cache = nil +} + +// flatten returns the list of addresses within this set, also caching it for later +// reuse. The returned slice should not be changed! +func (as *accountSet) flatten() []common.Name { + if as.cache == nil { + accounts := make([]common.Name, 0, len(as.accounts)) + for account := range as.accounts { + accounts = append(accounts, account) + } + as.cache = &accounts + } + return *as.cache +} + +// merge adds all addresses from the 'other' set into 'as'. +func (as *accountSet) merge(other *accountSet) { + for addr := range other.accounts { + as.accounts[addr] = struct{}{} + } + as.cache = nil } diff --git a/txpool/config.go b/txpool/config.go index 9faa876a..63b25ce8 100644 --- a/txpool/config.go +++ b/txpool/config.go @@ -96,6 +96,10 @@ func (config *Config) check() Config { log.Warn("Sanitizing invalid txpool lifetime", "provided", conf.Lifetime, "updated", DefaultTxPoolConfig.Lifetime) conf.Lifetime = DefaultTxPoolConfig.Lifetime } + if conf.ResendTime < 1 { + log.Warn("Sanitizing invalid txpool resendtime", "provided", conf.ResendTime, "updated", DefaultTxPoolConfig.ResendTime) + conf.ResendTime = DefaultTxPoolConfig.ResendTime + } if conf.RatioBroadcast < 1 { log.Warn("Sanitizing invalid txpool ratiobroadcast", "provided", conf.RatioBroadcast, "updated", DefaultTxPoolConfig.RatioBroadcast) conf.RatioBroadcast = DefaultTxPoolConfig.RatioBroadcast diff --git a/txpool/handler_test.go b/txpool/handler_test.go index 88832e81..98b1e8a1 100644 --- a/txpool/handler_test.go +++ b/txpool/handler_test.go @@ -157,7 +157,7 @@ func TestP2PTxMsg(t *testing.T) { event.SendTo(event.NewLocalStation("test", nil), nil, event.P2PTxMsg, txs) for { - if pending, quened := pool.Stats(); pending > 0 || quened > 0 { + if pending, _ := pool.Stats(); pending > 0 { break } } @@ -172,7 +172,7 @@ func TestP2PTxMsg(t *testing.T) { // trigger state change in the background trigger = true - pool.lockedReset(nil, nil) + pool.requestReset(nil, nil) _, err = pool.Pending() if err != nil { diff --git a/txpool/addrheartbeat.go b/txpool/nameheartbeat.go similarity index 100% rename from txpool/addrheartbeat.go rename to txpool/nameheartbeat.go diff --git a/txpool/pricedlsit.go b/txpool/pricedlsit.go index 97f67e9a..a72eda10 100644 --- a/txpool/pricedlsit.go +++ b/txpool/pricedlsit.go @@ -49,9 +49,9 @@ func (l *txPricedList) Put(tx *types.Transaction) { // Removed notifies the prices transaction list that an old transaction dropped // from the pool. The list will just keep a counter of stale objects and update // the heap if a large enough ratio of transactions go stale. -func (l *txPricedList) Removed() { +func (l *txPricedList) Removed(count int) { // Bump the stale counter, but exit if still too low (< 25%) - l.stales++ + l.stales += count if l.stales <= len(*l.items)/4 { return } diff --git a/txpool/test_utils.go b/txpool/test_utils.go index 9c0ffdf4..5b3877c5 100644 --- a/txpool/test_utils.go +++ b/txpool/test_utils.go @@ -49,6 +49,7 @@ func init() { AccountQueue: 64, GlobalQueue: 1024, Lifetime: 3 * time.Hour, + ResendTime: 10 * time.Minute, GasAssetID: uint64(0), } } diff --git a/txpool/txpool.go b/txpool/txpool.go index fa394914..6d4bc306 100644 --- a/txpool/txpool.go +++ b/txpool/txpool.go @@ -17,7 +17,6 @@ package txpool import ( - "fmt" "math" "math/big" "sort" @@ -63,30 +62,41 @@ type blockChain interface { StateAt(root common.Hash) (*state.StateDB, error) Config() *params.ChainConfig } +type txpoolResetRequest struct { + oldHead, newHead *types.Header +} // TxPool contains all currently known transactions. type TxPool struct { - config Config - gasPrice *big.Int - chain blockChain - signer types.Signer - chainHeadCh chan *event.Event - chainHeadSub event.Subscription - curAccountManager *am.AccountManager - pendingAccountManager *am.AccountManager - currentMaxGas uint64 // Current gas limit for transaction caps - locals *accountSet // Set of local transaction to exempt from eviction rules - journal *txJournal // Journal of local transaction to back up to disk - pending map[common.Name]*txList - queue map[common.Name]*txList - beats map[common.Name]time.Time // Last heartbeat from each known account - all *txLookup // All transactions to allow lookups - priced *txPricedList - station *TxpoolStation - - mu sync.RWMutex - pendingWg sync.WaitGroup // for pending preferring - wg sync.WaitGroup // for shutdown sync + config Config + gasPrice *big.Int + chain blockChain + signer types.Signer + + curAccountManager *am.AccountManager // Current state in the blockchain head + pendingAccountManager *am.AccountManager // Pending state tracking virtual nonces + currentMaxGas uint64 // Current gas limit for transaction caps + + locals *accountSet // Set of local transaction to exempt from eviction rules + journal *txJournal // Journal of local transaction to back up to disk + + pending map[common.Name]*txList + queue map[common.Name]*txList + beats map[common.Name]time.Time // Last heartbeat from each known account + all *txLookup // All transactions to allow lookups + priced *txPricedList + station *TxpoolStation + + chainHeadCh chan *event.Event + chainHeadSub event.Subscription + reqResetCh chan *txpoolResetRequest + reqPromoteCh chan *accountSet + queueTxEventCh chan *types.Transaction + reorgDoneCh chan chan struct{} + reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop + + mu sync.RWMutex + wg sync.WaitGroup // for shutdown sync } // New creates a new transaction pool to gather, sort and filter inbound @@ -97,21 +107,32 @@ func New(config Config, chainconfig *params.ChainConfig, bc blockChain) *TxPool config = (&config).check() signer := types.NewSigner(chainconfig.ChainID) all := newTxLookup() + tp := &TxPool{ - config: config, - chain: bc, - signer: signer, - locals: newAccountSet(signer), - chainHeadCh: make(chan *event.Event, chainHeadChanSize), - pending: make(map[common.Name]*txList), - queue: make(map[common.Name]*txList), - beats: make(map[common.Name]time.Time), - all: all, - priced: newTxPricedList(all), - gasPrice: new(big.Int).SetUint64(config.PriceLimit), + config: config, + chain: bc, + signer: signer, + locals: newAccountSet(signer), + pending: make(map[common.Name]*txList), + queue: make(map[common.Name]*txList), + beats: make(map[common.Name]time.Time), + all: all, + priced: newTxPricedList(all), + gasPrice: new(big.Int).SetUint64(config.PriceLimit), + chainHeadCh: make(chan *event.Event, chainHeadChanSize), + reqResetCh: make(chan *txpoolResetRequest), + reqPromoteCh: make(chan *accountSet), + queueTxEventCh: make(chan *types.Transaction), + reorgDoneCh: make(chan chan struct{}), + reorgShutdownCh: make(chan struct{}), } + tp.reset(nil, bc.CurrentBlock().Header()) + // Start the reorg loop early so it can handle requests generated during journal loading. + tp.wg.Add(1) + go tp.scheduleReorgLoop() + // If local transactions and journaling is enabled, load from disk if !config.NoLocals && config.Journal != "" { tp.journal = newTxJournal(config.Journal) @@ -132,6 +153,79 @@ func New(config Config, chainconfig *params.ChainConfig, bc blockChain) *TxPool return tp } +// scheduleReorgLoop schedules runs of reset and promoteExecutables. Code above should not +// call those methods directly, but request them being run using requestReset and +// requestPromoteExecutables instead. +func (tp *TxPool) scheduleReorgLoop() { + defer tp.wg.Done() + + var ( + curDone chan struct{} // non-nil while runReorg is active + nextDone = make(chan struct{}) + launchNextRun bool + reset *txpoolResetRequest + dirtyAccounts *accountSet + queuedEvents = make(map[common.Name]*txSortedMap) + ) + + for { + // Launch next background reorg if needed + if curDone == nil && launchNextRun { + // Run the background reorg and announcements + go tp.runReorg(nextDone, reset, dirtyAccounts, queuedEvents) + + // Prepare everything for the next round of reorg + curDone, nextDone = nextDone, make(chan struct{}) + launchNextRun = false + + reset, dirtyAccounts = nil, nil + queuedEvents = make(map[common.Name]*txSortedMap) + } + + select { + case req := <-tp.reqResetCh: + // Reset request: update head if request is already pending. + if reset == nil { + reset = req + } else { + reset.newHead = req.newHead + } + launchNextRun = true + tp.reorgDoneCh <- nextDone + + case req := <-tp.reqPromoteCh: + // Promote request: update name set if request is already pending. + if dirtyAccounts == nil { + dirtyAccounts = req + } else { + dirtyAccounts.merge(req) + } + launchNextRun = true + tp.reorgDoneCh <- nextDone + + case tx := <-tp.queueTxEventCh: + // Queue up the event, but don't schedule a reorg. It's up to the caller to + // request one later if they want the events sent. + name := tx.GetActions()[0].Sender() + if _, ok := queuedEvents[name]; !ok { + queuedEvents[name] = newTxSortedMap() + } + queuedEvents[name].Put(tx) + + case <-curDone: + curDone = nil + + case <-tp.reorgShutdownCh: + // Wait for current run to finish. + if curDone != nil { + <-curDone + } + close(nextDone) + return + } + } +} + // loop is the transaction pool's main feed loop, waiting for and reacting to // outside blockchain feeds as well as for various reporting and transaction // eviction feeds. @@ -163,13 +257,12 @@ func (tp *TxPool) loop() { case ev := <-tp.chainHeadCh: block := ev.Data.(*types.Block) if block != nil { - tp.mu.Lock() - tp.reset(head.Header(), block.Header()) + tp.requestReset(head.Header(), block.Header()) head = block - tp.mu.Unlock() } // Be unsubscribed due to system stopped case <-tp.chainHeadSub.Err(): + close(tp.reorgShutdownCh) return // Handle stats reporting ticks case <-report.C: @@ -226,12 +319,101 @@ func (tp *TxPool) loop() { } } -// lockedReset is a wrapper around reset to allow calling it in a thread safe -// manner. This method is only ever used in the tester! -func (tp *TxPool) lockedReset(oldHead, newHead *types.Header) { +// requestPromoteExecutables requests a pool reset to the new head block. +// The returned channel is closed when the reset has occurred. +func (tp *TxPool) requestReset(oldHead *types.Header, newHead *types.Header) chan struct{} { + select { + case tp.reqResetCh <- &txpoolResetRequest{oldHead, newHead}: + return <-tp.reorgDoneCh + case <-tp.reorgShutdownCh: + return tp.reorgShutdownCh + } +} + +// requestPromoteExecutables requests transaction promotion checks for the given addresses. +// The returned channel is closed when the promotion checks have occurred. +func (tp *TxPool) requestPromoteExecutables(set *accountSet) chan struct{} { + select { + case tp.reqPromoteCh <- set: + return <-tp.reorgDoneCh + case <-tp.reorgShutdownCh: + return tp.reorgShutdownCh + } +} + +// queueTxEvent enqueues a transaction event to be sent in the next reorg run. +func (tp *TxPool) queueTxEvent(tx *types.Transaction) { + select { + case tp.queueTxEventCh <- tx: + case <-tp.reorgShutdownCh: + } +} + +// runReorg runs reset and promoteExecutables on behalf of scheduleReorgLoop. +func (tp *TxPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirtyAccounts *accountSet, events map[common.Name]*txSortedMap) { + defer close(done) + + var promoteNames []common.Name + if dirtyAccounts != nil { + promoteNames = dirtyAccounts.flatten() + } tp.mu.Lock() - defer tp.mu.Unlock() - tp.reset(oldHead, newHead) + if reset != nil { + // Reset from the old head to the new, rescheduling any reorged transactions + tp.reset(reset.oldHead, reset.newHead) + + // Nonces were reset, discard any events that became stale + for name := range events { + nonce, _ := tp.pendingAccountManager.GetNonce(name) + events[name].Forward(nonce) + if events[name].Len() == 0 { + delete(events, name) + } + } + // Reset needs promote for all names + promoteNames = promoteNames[:0] + for name := range tp.queue { + promoteNames = append(promoteNames, name) + } + } + + // Check for pending transactions for every account that sent new ones + promoted := tp.promoteExecutables(promoteNames) + for _, tx := range promoted { + name := tx.GetActions()[0].Sender() + if _, ok := events[name]; !ok { + events[name] = newTxSortedMap() + } + events[name].Put(tx) + } + // If a new block appeared, validate the pool of pending transactions. This will + // remove any transaction that has been included in the block or was invalidated + // because of another transaction (e.g. higher gas price). + if reset != nil { + tp.demoteUnexecutables() + } + // Ensure pool.queue and pool.pending sizes stay within the configured limits. + tp.truncatePending() + tp.truncateQueue() + + // Update all accounts to the latest known pending nonce + for name, list := range tp.pending { + txs := list.Flatten() // Heavy but will be cached and is needed by the miner anyway + tp.pendingAccountManager.SetNonce(name, txs[len(txs)-1].GetActions()[0].Nonce()+1) + } + tp.mu.Unlock() + + // Notify subsystems for newly added transactions + if len(events) > 0 { + var txs []*types.Transaction + for _, set := range events { + txs = append(txs, set.Flatten()...) + } + events := []*event.Event{ + {Typecode: event.NewTxs, Data: txs}, + } + event.SendEvents(events) + } } // reset retrieves the current state of the blockchain and ensures the content @@ -255,6 +437,24 @@ func (tp *TxPool) reset(oldHead, newHead *types.Header) { rem = tp.chain.GetBlock(oldHead.Hash(), oldHead.Number.Uint64()) add = tp.chain.GetBlock(newHead.Hash(), newHead.Number.Uint64()) ) + + if rem == nil { + // This can happen if a setHead is performed, where we simply discard the old + // head from the chain. + // If that is the case, we don't have the lost transactions any more, and + // there's nothing to add + if newNum < oldNum { + // If the reorg ended up on a lower number, it's indicative of setHead being the cause + log.Debug("Skipping transaction reset caused by setHead", + "old", oldHead.Hash(), "oldnum", oldNum, "new", newHead.Hash(), "newnum", newNum) + } else { + // If we reorged to a same or higher number, then it's not a case of setHead + log.Warn("Transaction pool reset with missing oldhead", + "old", oldHead.Hash(), "oldnum", oldNum, "new", newHead.Hash(), "newnum", newNum) + } + return + } + for rem.NumberU64() > add.NumberU64() { discarded = append(discarded, rem.Transactions()...) if rem = tp.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil { @@ -308,32 +508,6 @@ func (tp *TxPool) reset(oldHead, newHead *types.Header) { log.Debug("Reinjecting stale transactions", "count", len(reinject)) SenderCacher.recover(tp.signer, reinject) tp.addTxsLocked(reinject, false) - - // validate the pool of pending transactions, this will remove - // any transactions that have been included in the block or - // have been invalidated because of another transaction (e.g. - // higher gas price) - tp.demoteUnexecutables() - - // Update all accounts to the latest known pending nonce - for name, list := range tp.pending { - txs := list.Flatten() // Heavy but will be cached and is needed by the miner anyway - // todo change transaction action nonce - if err := tp.pendingAccountManager.SetNonce(name, txs[len(txs)-1].GetActions()[0].Nonce()+1); err != nil { - if err != am.ErrAccountIsDestroy { - log.Error("Failed to pendingAccountManager SetNonce", "err", err) - return - } - delete(tp.pending, name) - delete(tp.beats, name) - delete(tp.queue, name) - log.Debug("Remove all destory account ", "name", name) - } - } - - // Check the queue and move transactions over to the pending if possible - // or remove those that have become invalid - tp.promoteExecutables(nil) } // Stop terminates the transaction tp. @@ -420,8 +594,6 @@ func (tp *TxPool) Content() (map[common.Name][]*types.Transaction, map[common.Na // account and sorted by nonce. The returned transaction set is a copy and can be // freely modified by calling code. func (tp *TxPool) Pending() (map[common.Name][]*types.Transaction, error) { - tp.pendingWg.Add(1) - defer tp.pendingWg.Done() tp.mu.Lock() defer tp.mu.Unlock() pending := make(map[common.Name][]*types.Transaction) @@ -563,20 +735,13 @@ func (tp *TxPool) add(tx *types.Transaction, local bool) (bool, error) { // New transaction is better, replace old one if old != nil { tp.all.Remove(old.Hash()) - tp.priced.Removed() + tp.priced.Removed(1) } tp.all.Add(tx) tp.priced.Put(tx) tp.journalTx(from, tx) - + tp.queueTxEvent(tx) log.Trace("Pooled new executable transaction", "hash", hash, "from", from) - - // We've directly injected a replacement transaction, notify subsystems - events := []*event.Event{ - {Typecode: event.NewTxs, Data: []*types.Transaction{tx}}, - } - go event.SendEvents(events) - return old != nil, nil } // New transaction isn't replacing a pending one, push into queue @@ -611,7 +776,7 @@ func (tp *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, erro // Discard any previous transaction and mark this if old != nil { tp.all.Remove(old.Hash()) - tp.priced.Removed() + tp.priced.Removed(1) } if tp.all.Get(hash) == nil { tp.all.Add(tx) @@ -647,13 +812,13 @@ func (tp *TxPool) promoteTx(name common.Name, hash common.Hash, tx *types.Transa if !inserted { // An older transaction was better, discard this tp.all.Remove(hash) - tp.priced.Removed() + tp.priced.Removed(1) return false } // Otherwise discard any previous transaction and mark this if old != nil { tp.all.Remove(old.Hash()) - tp.priced.Removed() + tp.priced.Removed(1) } // Failsafe to work around direct pending inserts (tests) @@ -668,76 +833,56 @@ func (tp *TxPool) promoteTx(name common.Name, hash common.Hash, tx *types.Transa return true } -// AddLocal enqueues a single transaction into the pool if it is valid, marking -// the sender as a local one in the mean time, ensuring it goes around the local -// pricing constraints. -func (tp *TxPool) AddLocal(tx *types.Transaction) error { - return tp.addTx(tx, !tp.config.NoLocals) -} - -// AddRemote enqueues a single transaction into the pool if it is valid. If the -// sender is not among the locally tracked ones, full pricing constraints will -// apply. -func (tp *TxPool) AddRemote(tx *types.Transaction) error { - return tp.addTx(tx, false) +// AddLocals enqueues a batch of transactions into the pool if they are valid, marking the +// senders as a local ones, ensuring they go around the local pricing constraints. +// +// This method is used to add transactions from the RPC API and performs synchronous pool +// reorganization and event propagation. +func (tp *TxPool) AddLocals(txs []*types.Transaction) []error { + return tp.addTxs(txs, !tp.config.NoLocals, true) } -// AddLocals enqueues a batch of transactions into the pool if they are valid, -// marking the senders as a local ones in the mean time, ensuring they go around -// the local pricing constraints. -func (tp *TxPool) AddLocals(txs []*types.Transaction) []error { - return tp.addTxs(txs, !tp.config.NoLocals) +// AddLocal enqueues a single local transaction into the pool if it is valid. This is +// a convenience wrapper aroundd AddLocals. +func (tp *TxPool) AddLocal(tx *types.Transaction) error { + errs := tp.AddLocals([]*types.Transaction{tx}) + return errs[0] } -// AddRemotes enqueues a batch of transactions into the pool if they are valid. -// If the senders are not among the locally tracked ones, full pricing constraints -// will apply. +// AddRemotes enqueues a batch of transactions into the pool if they are valid. If the +// senders are not among the locally tracked ones, full pricing constraints will apply. +// +// This method is used to add transactions from the p2p network and does not wait for pool +// reorganization and internal event propagation. func (tp *TxPool) AddRemotes(txs []*types.Transaction) []error { - return tp.addTxs(txs, false) + return tp.addTxs(txs, false, false) } -// addTx enqueues a single transaction into the pool if it is valid. -func (tp *TxPool) addTx(tx *types.Transaction, local bool) error { - if err := tx.Check(tp.chain.Config()); err != nil { - return err - } - - // If the transaction is already known, discard it - if tp.all.Get(tx.Hash()) != nil { - log.Error("Discarding already known transaction", "hash", tx.Hash()) - return fmt.Errorf("known transaction: %x", tx.Hash()) - } - - // Cache senders in transactions before obtaining lock - for _, action := range tx.GetActions() { - _, err := types.RecoverMultiKey(tp.signer, action, tx) - if err != nil { - return err - } - } +// This is like AddRemotes, but waits for pool reorganization. Tests use this method. +func (tp *TxPool) addRemotesSync(txs []*types.Transaction) []error { + return tp.addTxs(txs, false, true) +} - tp.pendingWg.Wait() - tp.mu.Lock() - defer tp.mu.Unlock() +// This is like AddRemotes with a single transaction, but waits for pool reorganization. Tests use this method. +func (tp *TxPool) addRemoteSync(tx *types.Transaction) error { + errs := tp.addRemotesSync([]*types.Transaction{tx}) + return errs[0] +} - // Try to inject the transaction and update any state - replace, err := tp.add(tx, local) - if err != nil { - return err - } - // If we added a new transaction, run promotion checks and return - if !replace { - // todo - from := tx.GetActions()[0].Sender() - tp.promoteExecutables([]common.Name{from}) - } - return nil +// AddRemote enqueues a single transaction into the pool if it is valid. This is a convenience +// wrapper around AddRemotes. +// +// Deprecated: use AddRemotes +func (tp *TxPool) AddRemote(tx *types.Transaction) error { + errs := tp.AddRemotes([]*types.Transaction{tx}) + return errs[0] } // addTxs attempts to queue a batch of transactions if they are valid. -func (tp *TxPool) addTxs(txs []*types.Transaction, local bool) []error { +func (tp *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error { var addedTxs []*types.Transaction + // Cache senders in transactions before obtaining lock (pool.signer is immutable) for _, tx := range txs { // If the transaction is already known, discard it if tp.all.Get(tx.Hash()) != nil { @@ -758,37 +903,30 @@ func (tp *TxPool) addTxs(txs []*types.Transaction, local bool) []error { addedTxs = append(addedTxs, tx) } - tp.pendingWg.Wait() tp.mu.Lock() - defer tp.mu.Unlock() + errs, dirtyNames := tp.addTxsLocked(txs, local) + tp.mu.Unlock() - return tp.addTxsLocked(addedTxs, local) + done := tp.requestPromoteExecutables(dirtyNames) + if sync { + <-done + } + return errs } -// addTxsLocked attempts to queue a batch of transactions if they are valid, -// whilst assuming the transaction pool lock is already held. -func (tp *TxPool) addTxsLocked(txs []*types.Transaction, local bool) []error { - // Add the batch of transaction, tracking the accepted ones - dirty := make(map[common.Name]struct{}) +// addTxsLocked attempts to queue a batch of transactions if they are valid. +// The transaction pool lock must be held. +func (tp *TxPool) addTxsLocked(txs []*types.Transaction, local bool) ([]error, *accountSet) { + dirty := newAccountSet(tp.signer) errs := make([]error, len(txs)) - for i, tx := range txs { - var replace bool - if replace, errs[i] = tp.add(tx, local); errs[i] == nil && !replace { - from := tx.GetActions()[0].Sender() - dirty[from] = struct{}{} + replaced, err := tp.add(tx, local) + errs[i] = err + if err == nil && !replaced { + dirty.addTx(tx) } } - - // Only reprocess the internal state if something was actually added - if len(dirty) > 0 { - names := make([]common.Name, 0, len(dirty)) - for name := range dirty { - names = append(names, name) - } - tp.promoteExecutables(names) - } - return errs + return errs, dirty } // Status returns the status (unknown/pending/queued) of a batch of transactions @@ -831,7 +969,7 @@ func (tp *TxPool) removeTx(hash common.Hash, outofbound bool) { // Remove it from the list of known transactions tp.all.Remove(hash) if outofbound { - tp.priced.Removed() + tp.priced.Removed(1) } // Remove the transaction from the pending lists and reset the account nonce if pending := tp.pending[from]; pending != nil { @@ -873,17 +1011,10 @@ func (tp *TxPool) removeTx(hash common.Hash, outofbound bool) { // promoteExecutables moves transactions that have become processable from the // future queue to the set of pending transactions. During this process, all // invalidated transactions (low nonce, low balance) are deleted. -func (tp *TxPool) promoteExecutables(accounts []common.Name) { +func (tp *TxPool) promoteExecutables(accounts []common.Name) []*types.Transaction { // Track the promoted transactions to broadcast them at once var promoted []*types.Transaction - // Gather all the accounts potentially needing updates - if accounts == nil { - accounts = make([]common.Name, 0, len(tp.queue)) - for name := range tp.queue { - accounts = append(accounts, name) - } - } // Iterate over all accounts and promote any executable transactions for _, name := range accounts { list := tp.queue[name] @@ -891,179 +1022,177 @@ func (tp *TxPool) promoteExecutables(accounts []common.Name) { continue // Just in case someone calls with a non existing account } // Drop all transactions that are deemed too old (low nonce) - nonce, err := tp.curAccountManager.GetNonce(name) - if err != nil { - log.Error("promoteExecutables current account manager get nonce err", "name", name, "err", err) - } - for _, tx := range list.Forward(nonce) { + nonce, _ := tp.curAccountManager.GetNonce(name) + forwards := list.Forward(nonce) + for _, tx := range forwards { hash := tx.Hash() - log.Trace("Removed old queued transaction", "hash", hash) tp.all.Remove(hash) - tp.priced.Removed() - } - // Drop all transactions that are too costly (low balance or out of gas or no permissions) - // todo assetID - balance, err := tp.curAccountManager.GetAccountBalanceByID(name, tp.config.GasAssetID, 0) - if err != nil { - log.Error("promoteExecutables current account manager get balance err ", "name", name, "assetID", tp.config.GasAssetID, "err", err) + log.Trace("Removed old queued transaction", "hash", hash) } + // Drop all transactions that are too costly (low balance or out of gas) + balance, _ := tp.curAccountManager.GetAccountBalanceByID(name, tp.config.GasAssetID, 0) drops, _ := list.Filter(balance, tp.currentMaxGas, tp.signer, tp.curAccountManager.GetAccountBalanceByID, tp.curAccountManager.RecoverTx) for _, tx := range drops { hash := tx.Hash() - log.Trace("Removed unpayable queued or no permissions transaction", "hash", hash) tp.all.Remove(hash) - tp.priced.Removed() + log.Trace("Removed unpayable queued transaction", "hash", hash) } // Gather all executable transactions and promote them - nonce, err = tp.pendingAccountManager.GetNonce(name) - if err != nil && err != am.ErrAccountNotExist { - log.Error("promoteExecutables pending account manager get nonce err ", "name", name, "err", err) - } - for _, tx := range list.Ready(nonce) { + nonce, _ = tp.pendingAccountManager.GetNonce(name) + readies := list.Ready(nonce) + for _, tx := range readies { hash := tx.Hash() if tp.promoteTx(name, hash, tx) { log.Trace("Promoting queued transaction", "hash", hash) promoted = append(promoted, tx) } } + // Drop all transactions over the allowed limit + var caps []*types.Transaction if !tp.locals.contains(name) { - for _, tx := range list.Cap(int(tp.config.AccountQueue)) { + caps = list.Cap(int(tp.config.AccountQueue)) + for _, tx := range caps { hash := tx.Hash() tp.all.Remove(hash) - tp.priced.Removed() log.Trace("Removed cap-exceeding queued transaction", "hash", hash) } } + // Mark all the items dropped as removed + tp.priced.Removed(len(forwards) + len(drops) + len(caps)) + // Delete the entire queue entry if it became empty. if list.Empty() { delete(tp.queue, name) } } - // Notify subsystem for new promoted transactions. - if len(promoted) > 0 { - events := []*event.Event{ - {Typecode: event.NewTxs, Data: promoted}, - } - go event.SendEvents(events) - } - // If the pending limit is overflown, start equalizing allowances + return promoted +} + +// truncatePending removes transactions from the pending queue if the pool is above the +// pending limit. The algorithm tries to reduce transaction counts by an approximately +// equal number for all for accounts with many pending transactions. +func (tp *TxPool) truncatePending() { pending := uint64(0) for _, list := range tp.pending { pending += uint64(list.Len()) } - if pending > tp.config.GlobalSlots { - // Assemble a spam order to penalize large transactors first - spammers := prque.New() - for name, list := range tp.pending { - // Only evict transactions from high rollers - if !tp.locals.contains(name) && uint64(list.Len()) > tp.config.AccountSlots { - spammers.Push(name, float32(list.Len())) - } - } - // Gradually drop transactions from offenders - offenders := []common.Name{} - for pending > tp.config.GlobalSlots && !spammers.Empty() { - // Retrieve the next offender if not local name - offender, _ := spammers.Pop() - offenders = append(offenders, offender.(common.Name)) - - // Equalize balances until all the same or below threshold - if len(offenders) > 1 { - // Calculate the equalization threshold for all current offenders - threshold := tp.pending[offender.(common.Name)].Len() - - // Iteratively reduce all offenders until below limit or threshold reached - for pending > tp.config.GlobalSlots && tp.pending[offenders[len(offenders)-2]].Len() > threshold { - for i := 0; i < len(offenders)-1; i++ { - list := tp.pending[offenders[i]] - for _, tx := range list.Cap(list.Len() - 1) { - // Drop the transaction from the global pools too - hash := tx.Hash() - tp.all.Remove(hash) - tp.priced.Removed() - - // Update the account nonce to the dropped transaction - // todo change action - pnonce, err := tp.pendingAccountManager.GetNonce(offenders[i]) - if err != nil && err != am.ErrAccountNotExist { - log.Error("promoteExecutables pending account manager get nonce err ", "name", offenders[i], "err", err) - } - if nonce := tx.GetActions()[0].Nonce(); pnonce > nonce { - if err := tp.pendingAccountManager.SetNonce(offenders[i], nonce); err != nil { - log.Error("promoteExecutables pending account manager set nonce err ", "name", offenders[i], "nonce", nonce, "err", err) - } - } - log.Trace("Removed fairness-exceeding pending transaction", "hash", hash) - } - pending-- - } - } - } + if pending <= tp.config.GlobalSlots { + return + } + + // Assemble a spam order to penalize large transactors first + spammers := prque.New() + for name, list := range tp.pending { + // Only evict transactions from high rollers + if !tp.locals.contains(name) && uint64(list.Len()) > tp.config.AccountSlots { + spammers.Push(name, float32(list.Len())) } - // If still above threshold, reduce to limit or min allowance - if pending > tp.config.GlobalSlots && len(offenders) > 0 { - for pending > tp.config.GlobalSlots && uint64(tp.pending[offenders[len(offenders)-1]].Len()) > tp.config.AccountSlots { - for _, name := range offenders { - list := tp.pending[name] - for _, tx := range list.Cap(list.Len() - 1) { + } + // Gradually drop transactions from offenders + offenders := []common.Name{} + for pending > tp.config.GlobalSlots && !spammers.Empty() { + // Retrieve the next offender if not local address + offender, _ := spammers.Pop() + offenders = append(offenders, offender.(common.Name)) + + // Equalize balances until all the same or below threshold + if len(offenders) > 1 { + // Calculate the equalization threshold for all current offenders + threshold := tp.pending[offender.(common.Name)].Len() + + // Iteratively reduce all offenders until below limit or threshold reached + for pending > tp.config.GlobalSlots && tp.pending[offenders[len(offenders)-2]].Len() > threshold { + for i := 0; i < len(offenders)-1; i++ { + list := tp.pending[offenders[i]] + + caps := list.Cap(list.Len() - 1) + for _, tx := range caps { // Drop the transaction from the global pools too hash := tx.Hash() tp.all.Remove(hash) - tp.priced.Removed() // Update the account nonce to the dropped transaction - pnonce, err := tp.pendingAccountManager.GetNonce(name) - if err != nil && err != am.ErrAccountNotExist { - log.Error("promoteExecutables pending account manager get nonce err ", "name", name, "err", err) - } + pnonce, _ := tp.pendingAccountManager.GetNonce(offenders[i]) if nonce := tx.GetActions()[0].Nonce(); pnonce > nonce { - tp.pendingAccountManager.SetNonce(name, nonce) + tp.pendingAccountManager.SetNonce(offenders[i], nonce) } log.Trace("Removed fairness-exceeding pending transaction", "hash", hash) } + tp.priced.Removed(len(caps)) pending-- } } } } - // If we've queued more transactions than the hard limit, drop oldest ones + + // If still above threshold, reduce to limit or min allowance + if pending > tp.config.GlobalSlots && len(offenders) > 0 { + for pending > tp.config.GlobalSlots && uint64(tp.pending[offenders[len(offenders)-1]].Len()) > tp.config.AccountSlots { + for _, name := range offenders { + list := tp.pending[name] + + caps := list.Cap(list.Len() - 1) + for _, tx := range caps { + // Drop the transaction from the global pools too + hash := tx.Hash() + tp.all.Remove(hash) + + // Update the account nonce to the dropped transaction + pnonce, _ := tp.pendingAccountManager.GetNonce(name) + + if nonce := tx.GetActions()[0].Nonce(); pnonce > nonce { + tp.pendingAccountManager.SetNonce(name, nonce) + } + log.Trace("Removed fairness-exceeding pending transaction", "hash", hash) + } + tp.priced.Removed(len(caps)) + pending-- + } + } + } +} + +// truncateQueue drops the oldes transactions in the queue if the pool is above the global queue limit. +func (tp *TxPool) truncateQueue() { queued := uint64(0) for _, list := range tp.queue { queued += uint64(list.Len()) } - if queued > tp.config.GlobalQueue { - // Sort all accounts with queued transactions by heartbeat - names := make(namesByHeartbeat, 0, len(tp.queue)) - for name := range tp.queue { - if !tp.locals.contains(name) { // don't drop locals - names = append(names, nameByHeartbeat{name, tp.beats[name]}) - } + if queued <= tp.config.GlobalQueue { + return + } + + // Sort all accounts with queued transactions by heartbeat + names := make(namesByHeartbeat, 0, len(tp.queue)) + for name := range tp.queue { + if !tp.locals.contains(name) { // don't drop locals + names = append(names, nameByHeartbeat{name, tp.beats[name]}) } - sort.Sort(names) + } + sort.Sort(names) - // Drop transactions until the total is below the limit or only locals remain - for drop := queued - tp.config.GlobalQueue; drop > 0 && len(names) > 0; { - nameBeat := names[len(names)-1] - list := tp.queue[nameBeat.name] + // Drop transactions until the total is below the limit or only locals remain + for drop := queued - tp.config.GlobalQueue; drop > 0 && len(names) > 0; { + name := names[len(names)-1] + list := tp.queue[name.name] - names = names[:len(names)-1] + names = names[:len(names)-1] - // Drop all transactions if they are less than the overflow - if size := uint64(list.Len()); size <= drop { - for _, tx := range list.Flatten() { - tp.removeTx(tx.Hash(), true) - } - drop -= size - continue - } - // Otherwise drop only last few transactions - txs := list.Flatten() - for i := len(txs) - 1; i >= 0 && drop > 0; i-- { - tp.removeTx(txs[i].Hash(), true) - drop-- + // Drop all transactions if they are less than the overflow + if size := uint64(list.Len()); size <= drop { + for _, tx := range list.Flatten() { + tp.removeTx(tx.Hash(), true) } + drop -= size + continue + } + // Otherwise drop only last few transactions + txs := list.Flatten() + for i := len(txs) - 1; i >= 0 && drop > 0; i-- { + tp.removeTx(txs[i].Hash(), true) + drop-- } } } @@ -1084,7 +1213,7 @@ func (tp *TxPool) demoteUnexecutables() { hash := tx.Hash() log.Trace("Removed old pending transaction", "hash", hash) tp.all.Remove(hash) - tp.priced.Removed() + tp.priced.Removed(1) } // Drop all transactions that are too costly (low balance or out of gas or no permissions), and queue any invalids back for later @@ -1098,7 +1227,7 @@ func (tp *TxPool) demoteUnexecutables() { hash := tx.Hash() log.Trace("Removed unpayable pending or no permissions transaction", "hash", hash) tp.all.Remove(hash) - tp.priced.Removed() + tp.priced.Removed(1) } for _, tx := range invalids { diff --git a/txpool/txpool_test.go b/txpool/txpool_test.go index 9184aa2b..61488e4a 100644 --- a/txpool/txpool_test.go +++ b/txpool/txpool_test.go @@ -78,7 +78,7 @@ func TestStateChangeDuringTransactionPoolReset(t *testing.T) { t.Fatalf("Invalid nonce, want 0, got %d", nonce) } - pool.AddRemotes([]*types.Transaction{tx0, tx1}) + pool.addRemotesSync([]*types.Transaction{tx0, tx1}) nonce, err = pool.State().GetNonce(fname) if err != nil { @@ -90,7 +90,7 @@ func TestStateChangeDuringTransactionPoolReset(t *testing.T) { // trigger state change in the background trigger = true - pool.lockedReset(nil, nil) + <-pool.requestReset(nil, nil) _, err = pool.Pending() if err != nil { @@ -167,10 +167,11 @@ func TestTransactionQueue(t *testing.T) { pool.curAccountManager.AddAccountBalanceByID(fname, assetID, big.NewInt(1000)) - pool.lockedReset(nil, nil) + <-pool.requestReset(nil, nil) + pool.enqueueTx(tx.Hash(), tx) + <-pool.requestPromoteExecutables(newAccountSet(pool.signer, fname)) - pool.promoteExecutables([]common.Name{fname}) if len(pool.pending) != 1 { t.Fatal("expected valid txs to be 1 is", len(pool.pending)) } @@ -178,9 +179,9 @@ func TestTransactionQueue(t *testing.T) { tx = transaction(1, fname, tname, 100, fkey) pool.curAccountManager.SetNonce(fname, 2) - pool.enqueueTx(tx.Hash(), tx) - pool.promoteExecutables([]common.Name{fname}) + <-pool.requestPromoteExecutables(newAccountSet(pool.signer, fname)) + if _, ok := pool.pending[fname].txs.items[tx.GetActions()[0].Nonce()]; ok { t.Fatal("expected transaction to be in tx pool") } @@ -199,13 +200,13 @@ func TestTransactionQueue(t *testing.T) { tx3 := transaction(11, fname, tname, 100, fkey) pool.curAccountManager.AddAccountBalanceByID(fname, assetID, big.NewInt(1000)) - pool.lockedReset(nil, nil) + <-pool.requestReset(nil, nil) pool.enqueueTx(tx1.Hash(), tx1) pool.enqueueTx(tx2.Hash(), tx2) pool.enqueueTx(tx3.Hash(), tx3) - pool.promoteExecutables([]common.Name{fname}) + <-pool.requestPromoteExecutables(newAccountSet(pool.signer, fname)) if len(pool.pending) != 1 { t.Fatal("expected tx pool to be 1, got", len(pool.pending)) @@ -234,24 +235,25 @@ func TestTransactionQueue(t *testing.T) { t.Fatal(err) } - pool.promoteExecutables(nil) + <-pool.requestPromoteExecutables(newAccountSet(pool.signer, fname)) if len(pool.queue) != 0 { t.Fatal("expected len(queue) == 0, got", pool.queue[fname].Len()) } pool.demoteUnexecutables() + if len(pool.pending) != 0 { t.Fatal("expected tx pool to be 0, got", len(pool.pending)) } } func TestTransactionChainFork(t *testing.T) { - var ( fname = common.Name("fromname") tname = common.Name("totestname") assetID = uint64(0) ) + pool, manager := setupTxPool(fname) defer pool.Stop() fkey := generateAccount(t, fname, manager) @@ -273,7 +275,7 @@ func TestTransactionChainFork(t *testing.T) { newmanager.AddAccountBalanceByID(fname, assetID, big.NewInt(100000000000000)) pool.chain = &testBlockChain{statedb, 1000000, new(event.Feed)} - pool.lockedReset(nil, nil) + <-pool.requestReset(nil, nil) } @@ -292,7 +294,6 @@ func TestTransactionChainFork(t *testing.T) { } func TestTransactionDoubleNonce(t *testing.T) { - var ( fname = common.Name("fromname") tname = common.Name("totestname") @@ -319,7 +320,7 @@ func TestTransactionDoubleNonce(t *testing.T) { newmanager.AddAccountBalanceByID(fname, assetID, big.NewInt(100000000000000)) pool.chain = &testBlockChain{statedb, 1000000, new(event.Feed)} - pool.lockedReset(nil, nil) + <-pool.requestReset(nil, nil) } resetAsset() @@ -347,7 +348,7 @@ func TestTransactionDoubleNonce(t *testing.T) { if replace, err := pool.add(tx2, false); err != nil || !replace { t.Fatalf("second transaction insert failed (%v) or not reported replacement (%v)", err, replace) } - pool.promoteExecutables([]common.Name{fname}) + <-pool.requestPromoteExecutables(newAccountSet(pool.signer, fname)) if pool.pending[fname].Len() != 1 { t.Fatal("expected 1 pending transactions, got", pool.pending[fname].Len()) } @@ -356,7 +357,8 @@ func TestTransactionDoubleNonce(t *testing.T) { } // Add the third transaction and ensure it's not saved (smaller price) pool.add(tx3, false) - pool.promoteExecutables([]common.Name{fname}) + <-pool.requestPromoteExecutables(newAccountSet(pool.signer, fname)) + if pool.pending[fname].Len() != 1 { t.Fatal("expected 1 pending transactions, got", pool.pending[fname].Len()) } @@ -370,7 +372,6 @@ func TestTransactionDoubleNonce(t *testing.T) { } func TestTransactionMissingNonce(t *testing.T) { - var ( fname = common.Name("fromname") tname = common.Name("totestname") @@ -416,14 +417,14 @@ func TestTransactionNonceRecovery(t *testing.T) { manager.AddAccountBalanceByID(fname, assetID, big.NewInt(100000000000000)) pool.curAccountManager.SetNonce(fname, n) - pool.lockedReset(nil, nil) + <-pool.requestReset(nil, nil) tx := transaction(n, fname, tname, 109000, fkey) if err := pool.AddRemote(tx); err != nil { t.Fatal(err) } // simulate some weird re-order of transactions and missing nonce(s) pool.curAccountManager.SetNonce(fname, n-1) - pool.lockedReset(nil, nil) + <-pool.requestReset(nil, nil) if fn, _ := pool.pendingAccountManager.GetNonce(fname); fn != n-1 { t.Fatalf("expected nonce to be %d, got %d", n-1, fn) } @@ -470,7 +471,7 @@ func TestTransactionDropping(t *testing.T) { if pool.all.Count() != 6 { t.Fatalf("total transaction mismatch: have %d, want %d", pool.all.Count(), 6) } - pool.lockedReset(nil, nil) + <-pool.requestReset(nil, nil) if pool.pending[fname].Len() != 3 { t.Fatalf("pending transaction mismatch: have %d, want %d", pool.pending[fname].Len(), 3) } @@ -482,7 +483,7 @@ func TestTransactionDropping(t *testing.T) { } // Reduce the balance of the account, and check that invalidated transactions are dropped pool.curAccountManager.SubAccountBalanceByID(fname, assetID, big.NewInt(750)) - pool.lockedReset(nil, nil) + <-pool.requestReset(nil, nil) if _, ok := pool.pending[fname].txs.items[tx0.GetActions()[0].Nonce()]; !ok { t.Fatalf("funded pending transaction missing: %v", tx0) @@ -507,7 +508,7 @@ func TestTransactionDropping(t *testing.T) { } // Reduce the block gas limit, check that invalidated transactions are dropped pool.chain.(*testBlockChain).gasLimit = 100 - pool.lockedReset(nil, nil) + <-pool.requestReset(nil, nil) if _, ok := pool.pending[fname].txs.items[tx0.GetActions()[0].Nonce()]; !ok { t.Fatalf("funded pending transaction missing: %v", tx0) @@ -530,7 +531,6 @@ func TestTransactionDropping(t *testing.T) { // of fund), all consecutive (still valid, but not executable) transactions are // postponed back into the future queue to prevent broadcasting them. func TestTransactionPostponing(t *testing.T) { - // Create the pool to test the postponing with statedb, _ := state.New(common.Hash{}, state.NewDatabase(mdb.NewMemDatabase())) blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)} @@ -567,7 +567,7 @@ func TestTransactionPostponing(t *testing.T) { txs = append(txs, tx) } } - for i, err := range pool.AddRemotes(txs) { + for i, err := range pool.addRemotesSync(txs) { if err != nil { t.Fatalf("tx %d: failed to add transactions: %v", i, err) } @@ -582,7 +582,7 @@ func TestTransactionPostponing(t *testing.T) { if pool.all.Count() != len(txs) { t.Fatalf("total transaction mismatch: have %d, want %d", pool.all.Count(), len(txs)) } - pool.lockedReset(nil, nil) + <-pool.requestReset(nil, nil) if pending := pool.pending[accs[0]].Len() + pool.pending[accs[1]].Len(); pending != len(txs) { t.Fatalf("pending transaction mismatch: have %d, want %d", pending, len(txs)) } @@ -597,7 +597,7 @@ func TestTransactionPostponing(t *testing.T) { pool.curAccountManager.SubAccountBalanceByID(name, assetID, big.NewInt(1010)) } - pool.lockedReset(nil, nil) + <-pool.requestReset(nil, nil) // The first account's first transaction remains valid, check that subsequent // ones are either filtered out, or queued up for later. @@ -649,7 +649,6 @@ func TestTransactionPostponing(t *testing.T) { // transactions from an origin account, filling the nonce gap moves all queued // ones into the pending pool. func TestTransactionGapFilling(t *testing.T) { - var ( fname = common.Name("fromname") tname = common.Name("totestname") @@ -668,12 +667,11 @@ func TestTransactionGapFilling(t *testing.T) { defer sub.Unsubscribe() // Create a pending and a queued transaction with a nonce-gap in between - if err := pool.AddRemote(transaction(0, fname, tname, 1000000, fkey)); err != nil { - t.Fatalf("failed to add pending transaction: %v", err) - } - if err := pool.AddRemote(transaction(2, fname, tname, 1000000, fkey)); err != nil { - t.Fatalf("failed to add queued transaction: %v", err) - } + pool.addRemotesSync([]*types.Transaction{ + transaction(0, fname, tname, 1000000, fkey), + transaction(2, fname, tname, 1000000, fkey), + }) + pending, queued := pool.Stats() if pending != 1 { t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 1) @@ -689,10 +687,11 @@ func TestTransactionGapFilling(t *testing.T) { t.Fatalf("pool internal state corrupted: %v", err) } // Fill the nonce gap and ensure all transactions become pending - if err := pool.AddRemote(transaction(1, fname, tname, 1000000, fkey)); err != nil { + if err := pool.addRemoteSync(transaction(1, fname, tname, 1000000, fkey)); err != nil { t.Fatalf("failed to add gapped transaction: %v", err) } pending, queued = pool.Stats() + if pending != 3 { t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 3) } @@ -710,7 +709,6 @@ func TestTransactionGapFilling(t *testing.T) { // Tests that if the transaction count belonging to a single account goes above // some threshold, the higher transactions are dropped to prevent DOS attacks. func TestTransactionQueueAccountLimiting(t *testing.T) { - var ( fname = common.Name("fromname") tname = common.Name("totestname") @@ -725,7 +723,7 @@ func TestTransactionQueueAccountLimiting(t *testing.T) { // Keep queuing up transactions and make sure all above a limit are dropped for i := uint64(1); i <= testTxPoolConfig.AccountQueue+5; i++ { - if err := pool.AddRemote(transaction(i, fname, tname, 1000000, fkey)); err != nil { + if err := pool.addRemoteSync(transaction(i, fname, tname, 1000000, fkey)); err != nil { t.Fatalf("tx %d: failed to add transaction: %v", i, err) } if len(pool.pending) != 0 { @@ -759,7 +757,6 @@ func TestTransactionQueueGlobalLimitingNoLocals(t *testing.T) { } func testTransactionQueueGlobalLimiting(t *testing.T, nolocals bool) { - // Create the pool to test the limit enforcement with statedb, _ := state.New(common.Hash{}, state.NewDatabase(mdb.NewMemDatabase())) blockchain := &testBlockChain{statedb, 10000000, new(event.Feed)} @@ -803,7 +800,7 @@ func testTransactionQueueGlobalLimiting(t *testing.T, nolocals bool) { nonces[accs[randInt]]++ } // Import the batch and verify that limits have been enforced - pool.AddRemotes(txs) + pool.addRemotesSync(txs) queued := 0 for name, list := range pool.queue { @@ -949,7 +946,7 @@ func TestTransactionPendingLimiting(t *testing.T) { // Keep queuing up transactions and make sure all above a limit are dropped for i := uint64(0); i < testTxPoolConfig.AccountQueue+5; i++ { - if err := pool.AddRemote(transaction(i, fname, tname, 1000000, fkey)); err != nil { + if err := pool.addRemoteSync(transaction(i, fname, tname, 1000000, fkey)); err != nil { t.Fatalf("tx %d: failed to add transaction: %v", i, err) } if pool.pending[fname].Len() != int(i)+1 { @@ -1023,7 +1020,7 @@ func TestTransactionPoolRepricing(t *testing.T) { ltx := pricedTransaction(0, accs[3], tname, 1000000, big.NewInt(1), keys[3]) // Import the batch and that both pending and queued transactions match up - pool.AddRemotes(txs) + pool.addRemotesSync(txs) pool.AddLocal(ltx) pending, queued := pool.Stats() @@ -1086,13 +1083,13 @@ func TestTransactionPoolRepricing(t *testing.T) { t.Fatalf("pool internal state corrupted: %v", err) } // And we can fill gaps with properly priced transactions - if err := pool.AddRemote(pricedTransaction(1, accs[0], tname, 1000000, big.NewInt(2), keys[0])); err != nil { + if err := pool.addRemoteSync(pricedTransaction(1, accs[0], tname, 1000000, big.NewInt(2), keys[0])); err != nil { t.Fatalf("failed to add pending transaction: %v", err) } - if err := pool.AddRemote(pricedTransaction(0, accs[1], tname, 1000000, big.NewInt(2), keys[1])); err != nil { + if err := pool.addRemoteSync(pricedTransaction(0, accs[1], tname, 1000000, big.NewInt(2), keys[1])); err != nil { t.Fatalf("failed to add pending transaction: %v", err) } - if err := pool.AddRemote(pricedTransaction(2, accs[2], tname, 1000000, big.NewInt(2), keys[2])); err != nil { + if err := pool.addRemoteSync(pricedTransaction(2, accs[2], tname, 1000000, big.NewInt(2), keys[2])); err != nil { t.Fatalf("failed to add queued transaction: %v", err) } if err := validateEvents(events, 5); err != nil { @@ -1220,7 +1217,7 @@ func TestTransactionPoolUnderpricing(t *testing.T) { ltx := pricedTransaction(0, accs[2], tname, 1000000, big.NewInt(1), keys[2]) // Import the batch and that both pending and queued transactions match up - pool.AddRemotes(txs) + pool.addRemotesSync(txs) pool.AddLocal(ltx) pending, queued := pool.Stats() @@ -1329,7 +1326,7 @@ func TestTransactionPoolStableUnderpricing(t *testing.T) { for i := uint64(0); i < config.GlobalSlots; i++ { txs = append(txs, pricedTransaction(i, accs[0], tname, 1000000, big.NewInt(1), keys[0])) } - pool.AddRemotes(txs) + pool.addRemotesSync(txs) pending, queued := pool.Stats() if pending != int(config.GlobalSlots) { @@ -1446,68 +1443,6 @@ func TestTransactionReplacement(t *testing.T) { } } -// Tests that the transaction limits are enforced the same way irrelevant whether -// the transactions are added one by one or in batches. -func TestTransactionQueueLimitingEquivalency(t *testing.T) { testTransactionLimitingEquivalency(t, 1) } -func TestTransactionPendingLimitingEquivalency(t *testing.T) { testTransactionLimitingEquivalency(t, 0) } - -func testTransactionLimitingEquivalency(t *testing.T, origin uint64) { - var ( - fname = common.Name("fromname") - tname = common.Name("totestname") - assetID = uint64(0) - ) - event.Reset() - pool, manager := setupTxPool(fname) - defer pool.Stop() - fkey := generateAccount(t, fname, manager, pool.pendingAccountManager) - generateAccount(t, tname, manager, pool.pendingAccountManager) - - pool.curAccountManager.AddAccountBalanceByID(fname, assetID, big.NewInt(10000000)) - - for i := uint64(0); i < testTxPoolConfig.AccountQueue+5; i++ { - if err := pool.AddRemote(transaction(origin+i, fname, tname, 1000000, fkey)); err != nil { - t.Fatalf("tx %d: failed to add transaction: %v", i, err) - } - } - - // Add a batch of transactions to a pool in one big batch - var ( - fname1 = common.Name("fromname1") - tname1 = common.Name("totestname1") - ) - event.Reset() - pool1, manager1 := setupTxPool(fname1) - defer pool1.Stop() - fkey1 := generateAccount(t, fname1, manager1, pool1.pendingAccountManager) - generateAccount(t, tname1, manager1, pool1.pendingAccountManager) - - pool1.curAccountManager.AddAccountBalanceByID(fname1, assetID, big.NewInt(10000000)) - - txs := []*types.Transaction{} - for i := uint64(0); i < testTxPoolConfig.AccountQueue+5; i++ { - txs = append(txs, transaction(origin+i, fname1, tname1, 1000000, fkey1)) - } - pool1.AddRemotes(txs) - - // Ensure the batch optimization honors the same pool mechanics - if len(pool.pending) != len(pool1.pending) { - t.Fatalf("pending transaction count mismatch: one-by-one algo: %d, batch algo: %d", len(pool.pending), len(pool1.pending)) - } - if len(pool.queue) != len(pool1.queue) { - t.Fatalf("queued transaction count mismatch: one-by-one algo: %d, batch algo: %d", len(pool.queue), len(pool1.queue)) - } - if pool.all.Count() != pool1.all.Count() { - t.Fatalf("total transaction count mismatch: one-by-one algo %d, batch algo %d", pool.all.Count(), pool1.all.Count()) - } - if err := validateTxPoolInternals(pool); err != nil { - t.Fatalf("pool 1 internal state corrupted: %v", err) - } - if err := validateTxPoolInternals(pool1); err != nil { - t.Fatalf("pool 2 internal state corrupted: %v", err) - } -} - // Tests that if the transaction count belonging to multiple accounts go above // some hard threshold, the higher transactions are dropped to prevent DOS // attacks. @@ -1551,7 +1486,7 @@ func TestTransactionPendingGlobalLimiting(t *testing.T) { } } // Import the batch and verify that limits have been enforced - pool.AddRemotes(txs) + pool.addRemotesSync(txs) pending := 0 for _, list := range pool.pending { @@ -1595,7 +1530,7 @@ func TestTransactionCapClearsFromAll(t *testing.T) { txs = append(txs, transaction(uint64(j), fname, tname, 1000000, fkey)) } // Import the batch and verify that limits have been enforced - pool.AddRemotes(txs) + pool.addRemotesSync(txs) if err := validateTxPoolInternals(pool); err != nil { t.Fatalf("pool internal state corrupted: %v", err) } @@ -1643,7 +1578,7 @@ func TestTransactionPendingMinimumAllowance(t *testing.T) { } } // Import the batch and verify that limits have been enforced - pool.AddRemotes(txs) + pool.addRemotesSync(txs) for name, list := range pool.pending { if list.Len() != int(testTxPoolConfig.AccountSlots) { @@ -1711,7 +1646,7 @@ func testTransactionJournaling(t *testing.T, nolocals bool) { if err := pool.AddLocal(pricedTransaction(2, localName, tname, 1000000, big.NewInt(1), local)); err != nil { t.Fatalf("failed to add local transaction: %v", err) } - if err := pool.AddRemote(pricedTransaction(0, remoteName, tname, 1000000, big.NewInt(1), remote)); err != nil { + if err := pool.addRemoteSync(pricedTransaction(0, remoteName, tname, 1000000, big.NewInt(1), remote)); err != nil { t.Fatalf("failed to add remote transaction: %v", err) } pending, queued := pool.Stats() @@ -1749,7 +1684,7 @@ func testTransactionJournaling(t *testing.T, nolocals bool) { } // Bump the nonce temporarily and ensure the newly invalidated transaction is removed manager.SetNonce(localName, 2) - pool.lockedReset(nil, nil) + <-pool.requestReset(nil, nil) time.Sleep(2 * config.Rejournal) pool.Stop() @@ -1780,7 +1715,6 @@ func testTransactionJournaling(t *testing.T, nolocals bool) { // TestTransactionStatusCheck tests that the pool can correctly retrieve the // pending status of individual transactions. func TestTransactionStatusCheck(t *testing.T) { - // Create the pool to test the status retrievals with statedb, _ := state.New(common.Hash{}, state.NewDatabase(mdb.NewMemDatabase())) blockchain := &testBlockChain{statedb, 10000000, new(event.Feed)} @@ -1814,7 +1748,7 @@ func TestTransactionStatusCheck(t *testing.T) { txs = append(txs, pricedTransaction(2, accs[2], tname, 1000000, big.NewInt(1), keys[2])) // Queued only // Import the transaction and ensure they are correctly added - pool.AddRemotes(txs) + pool.addRemotesSync(txs) pending, queued := pool.Stats() if pending != 2 { @@ -1906,32 +1840,6 @@ func benchmarkFuturePromotion(b *testing.B, size int) { } } -// Benchmarks the speed of iterative transaction insertion. -func BenchmarkPoolInsert(b *testing.B) { - // Generate a batch of transactions to enqueue into the pool - var ( - fname = common.Name("fromname") - tname = common.Name("totestname") - assetID = uint64(0) - ) - pool, manager := setupTxPool(fname) - defer pool.Stop() - fkey := generateAccount(nil, fname, manager) - generateAccount(nil, tname, manager) - - pool.curAccountManager.AddAccountBalanceByID(fname, assetID, big.NewInt(1000000)) - - txs := make([]*types.Transaction, b.N) - for i := 0; i < b.N; i++ { - txs[i] = transaction(uint64(i), fname, tname, 100000, fkey) - } - // Benchmark importing the transactions into the queue - b.ResetTimer() - for _, tx := range txs { - pool.AddRemote(tx) - } -} - // Benchmarks the speed of batched transaction insertion. func BenchmarkPoolBatchInsert100(b *testing.B) { benchmarkPoolBatchInsert(b, 100) } func BenchmarkPoolBatchInsert1000(b *testing.B) { benchmarkPoolBatchInsert(b, 1000) } @@ -1961,6 +1869,6 @@ func benchmarkPoolBatchInsert(b *testing.B, size int) { // Benchmark importing the transactions into the queue b.ResetTimer() for _, batch := range batches { - pool.AddRemotes(batch) + pool.addRemotesSync(batch) } }