From 1fd3ccda3a9223e1df3bfdc00ced918c1222720b Mon Sep 17 00:00:00 2001 From: Jorge Silva Date: Tue, 26 Mar 2024 13:45:49 +0000 Subject: [PATCH 1/7] add GasPrice() to offledger interface, typedPoolByNonce doesn't need to be generic --- packages/chain/mempool/mempool.go | 11 ++-- packages/chain/mempool/typed_pool_by_nonce.go | 56 ++++++++++--------- .../chain/mempool/typed_pool_by_nonce_test.go | 4 +- packages/isc/request.go | 2 + packages/isc/request_evmcall.go | 5 ++ packages/isc/request_evmtx.go | 5 ++ packages/isc/request_offledger.go | 5 ++ 7 files changed, 57 insertions(+), 31 deletions(-) diff --git a/packages/chain/mempool/mempool.go b/packages/chain/mempool/mempool.go index ba31817b92..8479da5da9 100644 --- a/packages/chain/mempool/mempool.go +++ b/packages/chain/mempool/mempool.go @@ -136,7 +136,7 @@ type mempoolImpl struct { tangleTime time.Time timePool TimePool onLedgerPool RequestPool[isc.OnLedgerRequest] - offLedgerPool *TypedPoolByNonce[isc.OffLedgerRequest] + offLedgerPool *offLedgerPool distSync gpa.GPA chainHeadAO *isc.AliasOutputWithID chainHeadState state.State @@ -233,7 +233,7 @@ func New( tangleTime: time.Time{}, timePool: NewTimePool(metrics.SetTimePoolSize, log.Named("TIM")), onLedgerPool: NewTypedPool[isc.OnLedgerRequest](waitReq, metrics.SetOnLedgerPoolSize, metrics.SetOnLedgerReqTime, log.Named("ONL")), - offLedgerPool: NewTypedPoolByNonce[isc.OffLedgerRequest](waitReq, metrics.SetOffLedgerPoolSize, metrics.SetOffLedgerReqTime, log.Named("OFF")), + offLedgerPool: NewOffledgerPool(waitReq, metrics.SetOffLedgerPoolSize, metrics.SetOffLedgerReqTime, log.Named("OFF")), chainHeadAO: nil, serverNodesUpdatedPipe: pipe.NewInfinitePipe[*reqServerNodesUpdated](), serverNodes: []*cryptolib.PublicKey{}, @@ -545,6 +545,9 @@ func (mpi *mempoolImpl) shouldAddOffledgerRequest(req isc.OffLedgerRequest) erro return fmt.Errorf("no funds on chain") } } + + // TODO check gas price - must be equal or higher than the feePolicy + return nil } @@ -600,7 +603,7 @@ func (mpi *mempoolImpl) refsToPropose(consensusID consGR.ConsensusID) []*isc.Req }) } - mpi.offLedgerPool.Iterate(func(account string, entries []*OrderedPoolEntry[isc.OffLedgerRequest]) { + mpi.offLedgerPool.Iterate(func(account string, entries []*OrderedPoolEntry) { agentID, err := isc.AgentIDFromString(account) if err != nil { panic(fmt.Errorf("invalid agentID string: %s", err.Error())) @@ -906,7 +909,7 @@ func (mpi *mempoolImpl) handleRePublishTimeTick() { } func (mpi *mempoolImpl) handleForceCleanMempool() { - mpi.offLedgerPool.Iterate(func(account string, entries []*OrderedPoolEntry[isc.OffLedgerRequest]) { + mpi.offLedgerPool.Iterate(func(account string, entries []*OrderedPoolEntry) { for _, e := range entries { if time.Since(e.ts) > mpi.ttl && !lo.Some(mpi.consensusInstances, e.proposedFor) { mpi.log.Debugf("handleForceCleanMempool, request TTL expired, removing: %s", e.req.ID().String()) diff --git a/packages/chain/mempool/typed_pool_by_nonce.go b/packages/chain/mempool/typed_pool_by_nonce.go index d5b7dca220..c38b350f58 100644 --- a/packages/chain/mempool/typed_pool_by_nonce.go +++ b/packages/chain/mempool/typed_pool_by_nonce.go @@ -17,49 +17,55 @@ import ( ) // keeps a map of requests ordered by nonce for each account -type TypedPoolByNonce[V isc.OffLedgerRequest] struct { +type offLedgerPool struct { waitReq WaitReq - refLUT *shrinkingmap.ShrinkingMap[isc.RequestRefKey, *OrderedPoolEntry[V]] + refLUT *shrinkingmap.ShrinkingMap[isc.RequestRefKey, *OrderedPoolEntry] // reqsByAcountOrdered keeps an ordered map of reqsByAcountOrdered for each account by nonce - reqsByAcountOrdered *shrinkingmap.ShrinkingMap[string, []*OrderedPoolEntry[V]] // string is isc.AgentID.String() + reqsByAcountOrdered *shrinkingmap.ShrinkingMap[string, []*OrderedPoolEntry] // string is isc.AgentID.String() sizeMetric func(int) timeMetric func(time.Duration) log *logger.Logger } -func NewTypedPoolByNonce[V isc.OffLedgerRequest](waitReq WaitReq, sizeMetric func(int), timeMetric func(time.Duration), log *logger.Logger) *TypedPoolByNonce[V] { - return &TypedPoolByNonce[V]{ +func NewOffledgerPool(waitReq WaitReq, sizeMetric func(int), timeMetric func(time.Duration), log *logger.Logger) *offLedgerPool { + return &offLedgerPool{ waitReq: waitReq, - reqsByAcountOrdered: shrinkingmap.New[string, []*OrderedPoolEntry[V]](), - refLUT: shrinkingmap.New[isc.RequestRefKey, *OrderedPoolEntry[V]](), + reqsByAcountOrdered: shrinkingmap.New[string, []*OrderedPoolEntry](), + refLUT: shrinkingmap.New[isc.RequestRefKey, *OrderedPoolEntry](), sizeMetric: sizeMetric, timeMetric: timeMetric, log: log, } } -type OrderedPoolEntry[V isc.OffLedgerRequest] struct { - req V +type OrderedPoolEntry struct { + req isc.OffLedgerRequest old bool ts time.Time proposedFor []consGR.ConsensusID } -func (p *TypedPoolByNonce[V]) Has(reqRef *isc.RequestRef) bool { +func (p *offLedgerPool) Has(reqRef *isc.RequestRef) bool { return p.refLUT.Has(reqRef.AsKey()) } -func (p *TypedPoolByNonce[V]) Get(reqRef *isc.RequestRef) V { +func (p *offLedgerPool) Get(reqRef *isc.RequestRef) isc.OffLedgerRequest { entry, exists := p.refLUT.Get(reqRef.AsKey()) if !exists { - return *new(V) + return isc.OffLedgerRequest(nil) } return entry.req } -func (p *TypedPoolByNonce[V]) Add(request V) { +func (p *offLedgerPool) Add(request isc.OffLedgerRequest) { + // TODO keep an ordered list by gas price + + // TODO drop the tx with the lowest price if the total number of requests is too big + + // TODO apply a similar limit to on-ledger requests + ref := isc.RequestRefFromRequest(request) - entry := &OrderedPoolEntry[V]{req: request, ts: time.Now()} + entry := &OrderedPoolEntry{req: request, ts: time.Now()} account := request.SenderAccount().String() if !p.refLUT.Set(ref.AsKey(), entry) { @@ -76,7 +82,7 @@ func (p *TypedPoolByNonce[V]) Add(request V) { reqsForAcount, exists := p.reqsByAcountOrdered.Get(account) if !exists { // no other requests for this account - p.reqsByAcountOrdered.Set(account, []*OrderedPoolEntry[V]{entry}) + p.reqsByAcountOrdered.Set(account, []*OrderedPoolEntry{entry}) return } @@ -84,7 +90,7 @@ func (p *TypedPoolByNonce[V]) Add(request V) { // find the index where the new entry should be added index, exists := slices.BinarySearchFunc(reqsForAcount, entry, - func(a, b *OrderedPoolEntry[V]) int { + func(a, b *OrderedPoolEntry) int { aNonce := a.req.Nonce() bNonce := b.req.Nonce() if aNonce == bNonce { @@ -112,7 +118,7 @@ func (p *TypedPoolByNonce[V]) Add(request V) { p.reqsByAcountOrdered.Set(account, reqsForAcount) } -func (p *TypedPoolByNonce[V]) Remove(request V) { +func (p *offLedgerPool) Remove(request isc.OffLedgerRequest) { refKey := isc.RequestRefFromRequest(request).AsKey() entry, exists := p.refLUT.Get(refKey) if !exists { @@ -132,7 +138,7 @@ func (p *TypedPoolByNonce[V]) Remove(request V) { return } // find the request in the accounts map - indexToDel := slices.IndexFunc(reqsByAccount, func(e *OrderedPoolEntry[V]) bool { + indexToDel := slices.IndexFunc(reqsByAccount, func(e *OrderedPoolEntry) bool { return refKey == isc.RequestRefFromRequest(e.req).AsKey() }) if indexToDel == -1 { @@ -148,15 +154,15 @@ func (p *TypedPoolByNonce[V]) Remove(request V) { p.reqsByAcountOrdered.Set(account, reqsByAccount) } -func (p *TypedPoolByNonce[V]) Iterate(f func(account string, requests []*OrderedPoolEntry[V])) { - p.reqsByAcountOrdered.ForEach(func(acc string, entries []*OrderedPoolEntry[V]) bool { +func (p *offLedgerPool) Iterate(f func(account string, requests []*OrderedPoolEntry)) { + p.reqsByAcountOrdered.ForEach(func(acc string, entries []*OrderedPoolEntry) bool { f(acc, slices.Clone(entries)) return true }) } -func (p *TypedPoolByNonce[V]) Filter(predicate func(request V, ts time.Time) bool) { - p.refLUT.ForEach(func(refKey isc.RequestRefKey, entry *OrderedPoolEntry[V]) bool { +func (p *offLedgerPool) Filter(predicate func(request isc.OffLedgerRequest, ts time.Time) bool) { + p.refLUT.ForEach(func(refKey isc.RequestRefKey, entry *OrderedPoolEntry) bool { if !predicate(entry.req, entry.ts) { p.Remove(entry.req) } @@ -165,12 +171,12 @@ func (p *TypedPoolByNonce[V]) Filter(predicate func(request V, ts time.Time) boo p.sizeMetric(p.refLUT.Size()) } -func (p *TypedPoolByNonce[V]) StatusString() string { +func (p *offLedgerPool) StatusString() string { return fmt.Sprintf("{|req|=%d}", p.refLUT.Size()) } -func (p *TypedPoolByNonce[V]) WriteContent(w io.Writer) { - p.reqsByAcountOrdered.ForEach(func(_ string, list []*OrderedPoolEntry[V]) bool { +func (p *offLedgerPool) WriteContent(w io.Writer) { + p.reqsByAcountOrdered.ForEach(func(_ string, list []*OrderedPoolEntry) bool { for _, entry := range list { jsonData, err := isc.RequestToJSON(entry.req) if err != nil { diff --git a/packages/chain/mempool/typed_pool_by_nonce_test.go b/packages/chain/mempool/typed_pool_by_nonce_test.go index 85ef62afd8..8bbb96d291 100644 --- a/packages/chain/mempool/typed_pool_by_nonce_test.go +++ b/packages/chain/mempool/typed_pool_by_nonce_test.go @@ -14,7 +14,7 @@ import ( func TestSomething(t *testing.T) { waitReq := NewWaitReq(waitRequestCleanupEvery) - pool := NewTypedPoolByNonce[isc.OffLedgerRequest](waitReq, func(int) {}, func(time.Duration) {}, testlogger.NewSilentLogger("", true)) + pool := NewOffledgerPool(waitReq, func(int) {}, func(time.Duration) {}, testlogger.NewSilentLogger("", true)) // generate a bunch of requests for the same account kp, addr := testkey.GenKeyAddr() @@ -41,7 +41,7 @@ func TestSomething(t *testing.T) { require.Len(t, reqsInPoolForAccount, 4) // try to remove everything during iteration - pool.Iterate(func(account string, entries []*OrderedPoolEntry[isc.OffLedgerRequest]) { + pool.Iterate(func(account string, entries []*OrderedPoolEntry) { for _, e := range entries { pool.Remove(e.req) } diff --git a/packages/isc/request.go b/packages/isc/request.go index ed4fde83ec..05a3e18926 100644 --- a/packages/isc/request.go +++ b/packages/isc/request.go @@ -3,6 +3,7 @@ package isc import ( "fmt" "io" + "math/big" "time" "github.com/ethereum/go-ethereum" @@ -80,6 +81,7 @@ type OffLedgerRequest interface { ChainID() ChainID Nonce() uint64 VerifySignature() error + GasPrice() (price *big.Int, specified bool) } type OnLedgerRequest interface { diff --git a/packages/isc/request_evmcall.go b/packages/isc/request_evmcall.go index 67f6b51027..7a92242e5b 100644 --- a/packages/isc/request_evmcall.go +++ b/packages/isc/request_evmcall.go @@ -126,3 +126,8 @@ func (req *evmOffLedgerCallRequest) VerifySignature() error { func (req *evmOffLedgerCallRequest) EVMCallMsg() *ethereum.CallMsg { return &req.callMsg } + +// GasPrice implements OffLedgerRequest. +func (req *evmOffLedgerCallRequest) GasPrice() (price *big.Int, specified bool) { + return req.callMsg.GasPrice, true +} diff --git a/packages/isc/request_evmtx.go b/packages/isc/request_evmtx.go index 7a094d400d..5e6543b246 100644 --- a/packages/isc/request_evmtx.go +++ b/packages/isc/request_evmtx.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io" + "math/big" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/core/types" @@ -153,3 +154,7 @@ func (req *evmOffLedgerTxRequest) VerifySignature() error { func (req *evmOffLedgerTxRequest) EVMCallMsg() *ethereum.CallMsg { return EVMCallDataFromTx(req.tx) } + +func (req *evmOffLedgerTxRequest) GasPrice() (price *big.Int, specified bool) { + return req.tx.GasPrice(), true +} diff --git a/packages/isc/request_offledger.go b/packages/isc/request_offledger.go index faaf29efe0..fc50095f1e 100644 --- a/packages/isc/request_offledger.go +++ b/packages/isc/request_offledger.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "io" + "math/big" "time" "github.com/ethereum/go-ethereum" @@ -271,3 +272,7 @@ func (req *OffLedgerRequestData) WithSender(sender *cryptolib.PublicKey) Unsigne } return req } + +func (req *OffLedgerRequestData) GasPrice() (price *big.Int, specified bool) { + return big.NewInt(0), false +} From be2564d57b28c2b9acefebadc96f6a878a8bf222 Mon Sep 17 00:00:00 2001 From: Jorge Silva Date: Tue, 2 Apr 2024 10:04:59 +0100 Subject: [PATCH 2/7] limit offledger mempool by gas price --- components/chains/params.go | 6 + packages/chain/mempool/mempool.go | 48 +-- packages/chain/mempool/offledger_pool.go | 297 ++++++++++++++++++ packages/chain/mempool/offledger_pool_test.go | 99 ++++++ packages/chain/mempool/time_pool.go | 34 +- packages/chain/mempool/time_pool_test.go | 24 +- packages/chain/mempool/typed_pool.go | 6 +- packages/chain/mempool/typed_pool_by_nonce.go | 196 ------------ .../chain/mempool/typed_pool_by_nonce_test.go | 51 --- packages/isc/request.go | 3 +- packages/isc/request_evmcall.go | 4 +- packages/isc/request_evmtx.go | 8 +- packages/isc/request_offledger.go | 9 +- packages/testutil/dummyrequest.go | 29 ++ packages/testutil/run_heavy_tests_false.go | 1 - packages/vm/core/governance/stateaccess.go | 7 + 16 files changed, 520 insertions(+), 302 deletions(-) create mode 100644 packages/chain/mempool/offledger_pool.go create mode 100644 packages/chain/mempool/offledger_pool_test.go delete mode 100644 packages/chain/mempool/typed_pool_by_nonce.go delete mode 100644 packages/chain/mempool/typed_pool_by_nonce_test.go diff --git a/components/chains/params.go b/components/chains/params.go index 2d55290b53..261de5d559 100644 --- a/components/chains/params.go +++ b/components/chains/params.go @@ -20,6 +20,12 @@ type ParametersChains struct { ConsensusInstsInAdvance int `default:"3" usage:""` AwaitReceiptCleanupEvery int `default:"100" usage:"for every this number AwaitReceipt will be cleaned up"` MempoolTTL time.Duration `default:"24h" usage:"Time that requests are allowed to sit in the mempool without being processed"` + MempoolMaxOffledgerInPool time.Duration `default:"10000" usage:"Maximum number of off-ledger requests kept in the mempool"` + MempoolMaxOnledgerInPool time.Duration `default:"2000" usage:"Maximum number of on-ledger requests kept in the mempool"` + MempoolMaxTimedInPool time.Duration `default:"500" usage:"Maximum number of timed on-ledger requests kept in the mempool"` + MempoolMaxOnledgerToPropose time.Duration `default:"200" usage:"Maximum number of offledger requests to propose for the next block"` + MempoolMaxOffledgerToPropose time.Duration `default:"100" usage:"Maximum number of on-ledger requests to propose for the next block"` + MempoolMAxTimedToPropose time.Duration `default:"100" usage:"Maximum number of timed on-ledger requests to propose for the next block"` } type ParametersWAL struct { diff --git a/packages/chain/mempool/mempool.go b/packages/chain/mempool/mempool.go index 8479da5da9..6fa416938c 100644 --- a/packages/chain/mempool/mempool.go +++ b/packages/chain/mempool/mempool.go @@ -119,7 +119,7 @@ type RequestPool[V isc.Request] interface { Add(request V) Remove(request V) // this removes requests from the pool if predicate returns false - Filter(predicate func(request V, ts time.Time) bool) + Cleanup(predicate func(request V, ts time.Time) bool) Iterate(f func(e *typedPoolEntry[V])) StatusString() string WriteContent(io.Writer) @@ -134,8 +134,8 @@ type RequestPool[V isc.Request] interface { type mempoolImpl struct { chainID isc.ChainID tangleTime time.Time - timePool TimePool - onLedgerPool RequestPool[isc.OnLedgerRequest] + timePool TimePool // TODO limit this pool + onLedgerPool RequestPool[isc.OnLedgerRequest] // TODO limit this pool offLedgerPool *offLedgerPool distSync gpa.GPA chainHeadAO *isc.AliasOutputWithID @@ -214,6 +214,9 @@ type reqTrackNewChainHead struct { responseCh chan<- bool // only for tests, shouldn't be used in the chain package } +// TODO make these configurable +const MaxOffLedgerPoolSize = 1000 + func New( ctx context.Context, chainID isc.ChainID, @@ -233,7 +236,7 @@ func New( tangleTime: time.Time{}, timePool: NewTimePool(metrics.SetTimePoolSize, log.Named("TIM")), onLedgerPool: NewTypedPool[isc.OnLedgerRequest](waitReq, metrics.SetOnLedgerPoolSize, metrics.SetOnLedgerReqTime, log.Named("ONL")), - offLedgerPool: NewOffledgerPool(waitReq, metrics.SetOffLedgerPoolSize, metrics.SetOffLedgerReqTime, log.Named("OFF")), + offLedgerPool: NewOffledgerPool(MaxOffLedgerPoolSize, waitReq, metrics.SetOffLedgerPoolSize, metrics.SetOffLedgerReqTime, log.Named("OFF")), chainHeadAO: nil, serverNodesUpdatedPipe: pipe.NewInfinitePipe[*reqServerNodesUpdated](), serverNodes: []*cryptolib.PublicKey{}, @@ -546,7 +549,10 @@ func (mpi *mempoolImpl) shouldAddOffledgerRequest(req isc.OffLedgerRequest) erro } } - // TODO check gas price - must be equal or higher than the feePolicy + // reject txs with gas price too low + if gp := req.GasPrice(); gp != nil && gp.Cmp(mpi.offLedgerPool.minGasPrice) == -1 { + return fmt.Errorf("gas price too low. Must be at least %s", mpi.offLedgerPool.minGasPrice.String()) + } return nil } @@ -588,11 +594,15 @@ func (mpi *mempoolImpl) handleConsensusProposal(recv *reqConsensusProposal) { } func (mpi *mempoolImpl) refsToPropose(consensusID consGR.ConsensusID) []*isc.RequestRef { + // TODO add a limit of N requests to propose. + + // TODO change to propose the "top priced requests" + // // The case for matching ChainHeadAO and request BaseAO reqRefs := []*isc.RequestRef{} if !mpi.tangleTime.IsZero() { // Wait for tangle-time to process the on ledger requests. - mpi.onLedgerPool.Filter(func(request isc.OnLedgerRequest, _ time.Time) bool { + mpi.onLedgerPool.Cleanup(func(request isc.OnLedgerRequest, _ time.Time) bool { if isc.RequestIsExpired(request, mpi.tangleTime) { return false // Drop it from the mempool } @@ -772,17 +782,9 @@ func (mpi *mempoolImpl) handleTangleTimeUpdated(tangleTime time.Time) { // // Add requests from time locked pool. reqs := mpi.timePool.TakeTill(tangleTime) - for i := range reqs { - switch req := reqs[i].(type) { - case isc.OnLedgerRequest: - mpi.onLedgerPool.Add(req) - mpi.metrics.IncRequestsReceived(req) - case isc.OffLedgerRequest: - mpi.offLedgerPool.Add(req) - mpi.metrics.IncRequestsReceived(req) - default: - panic(fmt.Errorf("unexpected request type: %T, %+v", req, req)) - } + for _, req := range reqs { + mpi.onLedgerPool.Add(req) + mpi.metrics.IncRequestsReceived(req) } // // Notify existing on-ledger requests if that's first time update. @@ -800,6 +802,10 @@ func (mpi *mempoolImpl) handleTangleTimeUpdated(tangleTime time.Time) { func (mpi *mempoolImpl) handleTrackNewChainHead(req *reqTrackNewChainHead) { defer close(req.responseCh) mpi.log.Debugf("handleTrackNewChainHead, %v from %v, current=%v", req.till, req.from, mpi.chainHeadAO) + + // update defaultGasPrice for offLedger requests + mpi.offLedgerPool.SetMinGasPrice(governance.NewStateAccess(mpi.chainHeadState).DefaultGasPrice()) + if len(req.removed) != 0 { mpi.log.Infof("Reorg detected, removing %v blocks, adding %v blocks", len(req.removed), len(req.added)) // TODO: For IOTA 2.0: Maybe re-read the state from L1 (when reorgs will become possible). @@ -900,7 +906,7 @@ func (mpi *mempoolImpl) handleRePublishTimeTick() { return // re-broadcasting is disabled } retryOlder := time.Now().Add(-mpi.broadcastInterval) - mpi.offLedgerPool.Filter(func(request isc.OffLedgerRequest, ts time.Time) bool { + mpi.offLedgerPool.Cleanup(func(request isc.OffLedgerRequest, ts time.Time) bool { if ts.Before(retryOlder) { mpi.sendMessages(mpi.distSync.Input(distsync.NewInputPublishRequest(request))) } @@ -951,9 +957,9 @@ func (mpi *mempoolImpl) tryRemoveRequest(req isc.Request) { } func (mpi *mempoolImpl) tryCleanupProcessed(chainState state.State) { - mpi.onLedgerPool.Filter(unprocessedPredicate[isc.OnLedgerRequest](chainState, mpi.log)) - mpi.offLedgerPool.Filter(unprocessedPredicate[isc.OffLedgerRequest](chainState, mpi.log)) - mpi.timePool.Filter(unprocessedPredicate[isc.Request](chainState, mpi.log)) + mpi.onLedgerPool.Cleanup(unprocessedPredicate[isc.OnLedgerRequest](chainState, mpi.log)) + mpi.offLedgerPool.Cleanup(unprocessedPredicate[isc.OffLedgerRequest](chainState, mpi.log)) + mpi.timePool.Cleanup(unprocessedPredicate[isc.OnLedgerRequest](chainState, mpi.log)) } func (mpi *mempoolImpl) sendMessages(outMsgs gpa.OutMessages) { diff --git a/packages/chain/mempool/offledger_pool.go b/packages/chain/mempool/offledger_pool.go new file mode 100644 index 0000000000..4e142e468e --- /dev/null +++ b/packages/chain/mempool/offledger_pool.go @@ -0,0 +1,297 @@ +// Copyright 2020 IOTA Stiftung +// SPDX-License-Identifier: Apache-2.0 + +package mempool + +import ( + "fmt" + "io" + "math/big" + "slices" + "time" + + "github.com/iotaledger/hive.go/ds/shrinkingmap" + "github.com/iotaledger/hive.go/logger" + consGR "github.com/iotaledger/wasp/packages/chain/cons/cons_gr" + "github.com/iotaledger/wasp/packages/isc" + "github.com/iotaledger/wasp/packages/kv/codec" +) + +// keeps a map of requests ordered by nonce for each account +type offLedgerPool struct { + waitReq WaitReq + refLUT *shrinkingmap.ShrinkingMap[isc.RequestRefKey, *OrderedPoolEntry] + // reqsByAcountOrdered keeps an ordered map of reqsByAcountOrdered for each account by nonce + reqsByAcountOrdered *shrinkingmap.ShrinkingMap[string, []*OrderedPoolEntry] // string is isc.AgentID.String() + // orderedByGasPrice keeps a list ordered by the highest gas price + orderedByGasPrice []*OrderedPoolEntry // TODO use a better data structure instead!!! (probably RedBlackTree) + minGasPrice *big.Int + maxPoolSize int + sizeMetric func(int) + timeMetric func(time.Duration) + log *logger.Logger +} + +func NewOffledgerPool(maxPoolSize int, waitReq WaitReq, sizeMetric func(int), timeMetric func(time.Duration), log *logger.Logger) *offLedgerPool { + return &offLedgerPool{ + waitReq: waitReq, + refLUT: shrinkingmap.New[isc.RequestRefKey, *OrderedPoolEntry](), + reqsByAcountOrdered: shrinkingmap.New[string, []*OrderedPoolEntry](), + orderedByGasPrice: []*OrderedPoolEntry{}, + minGasPrice: big.NewInt(1), + maxPoolSize: maxPoolSize, + sizeMetric: sizeMetric, + timeMetric: timeMetric, + log: log, + } +} + +type OrderedPoolEntry struct { + req isc.OffLedgerRequest + old bool + ts time.Time + proposedFor []consGR.ConsensusID +} + +func (p *offLedgerPool) Has(reqRef *isc.RequestRef) bool { + return p.refLUT.Has(reqRef.AsKey()) +} + +func (p *offLedgerPool) Get(reqRef *isc.RequestRef) isc.OffLedgerRequest { + entry, exists := p.refLUT.Get(reqRef.AsKey()) + if !exists { + return isc.OffLedgerRequest(nil) + } + return entry.req +} + +func (p *offLedgerPool) Add(request isc.OffLedgerRequest) { + ref := isc.RequestRefFromRequest(request) + entry := &OrderedPoolEntry{req: request, ts: time.Now()} + account := request.SenderAccount().String() + + // + // add the request to the "request ref" Lookup Table + if !p.refLUT.Set(ref.AsKey(), entry) { + p.log.Debugf("NOT ADDED, already exists. reqID: %v as key=%v, senderAccount: ", request.ID(), ref, account) + return // not added already exists + } + + // update metrics and signal that the request is available, once this function ends + defer func() { + p.log.Debugf("ADD %v as key=%v, senderAccount: %s", request.ID(), ref, account) + p.sizeMetric(p.refLUT.Size()) + p.waitReq.MarkAvailable(request) + }() + + // + // add to the account requests, keep the slice ordered + { + reqsForAcount, exists := p.reqsByAcountOrdered.Get(account) + if !exists { + // no other requests for this account + p.reqsByAcountOrdered.Set(account, []*OrderedPoolEntry{entry}) + } else { + // find the index where the new entry should be added + index, exists := slices.BinarySearchFunc(reqsForAcount, entry, + func(a, b *OrderedPoolEntry) int { + aNonce := a.req.Nonce() + bNonce := b.req.Nonce() + if aNonce == bNonce { + return 0 + } + if aNonce > bNonce { + return 1 + } + return -1 + }, + ) + if exists { + // same nonce, mark the existing request with overlapping nonce as "old", place the new one + // NOTE: do not delete the request here, as it might already be part of an on-going consensus round + reqsForAcount[index].old = true + } + + reqsForAcount = append(reqsForAcount, entry) // add to the end of the list (thus extending the array) + + // make room if target position is not at the end + if index != len(reqsForAcount)+1 { + copy(reqsForAcount[index+1:], reqsForAcount[index:]) + reqsForAcount[index] = entry + } + p.reqsByAcountOrdered.Set(account, reqsForAcount) + } + } + + // + // add the to the ordered list of requests by gas price + { + index, _ := slices.BinarySearchFunc(p.orderedByGasPrice, entry, p.reqSort) + p.orderedByGasPrice = append(p.orderedByGasPrice, entry) + // make room if target position is not at the end + if index != len(p.orderedByGasPrice) { + copy(p.orderedByGasPrice[index+1:], p.orderedByGasPrice[index:]) + p.orderedByGasPrice[index] = entry + } + } + + p.LimitPoolSize() +} + +// LimitPoolSize drops the txs with the lowest price if the total number of requests is too big +func (p *offLedgerPool) LimitPoolSize() { + // TODO apply a similar limit to on-ledger/time pool (it cannot be unbound) + if len(p.orderedByGasPrice) <= p.maxPoolSize { + return // nothing to do + } + + totalToDelete := len(p.orderedByGasPrice) - p.maxPoolSize + reqsToDelete := make([]*OrderedPoolEntry, totalToDelete) + j := 0 + for i := 0; i < len(p.orderedByGasPrice); i++ { + if len(p.orderedByGasPrice[i].proposedFor) > 0 { + continue // we cannot drop requests that are being used in current consensus instances + } + reqsToDelete[j] = p.orderedByGasPrice[i] + if j >= totalToDelete-1 { + break + } + } + + for _, r := range reqsToDelete { + p.Remove(r.req) + } +} + +func (p *offLedgerPool) GasPrice(e *OrderedPoolEntry) *big.Int { + price := e.req.GasPrice() + if price != nil { + return price + } + // requests without a price specified are assigned the minimum gas price + return p.minGasPrice +} + +func (p *offLedgerPool) SetMinGasPrice(newPrice *big.Int) { + if p.minGasPrice.Cmp(newPrice) == 0 { + // no change + return + } + // update the price and re-order the transactions + p.minGasPrice = newPrice + slices.SortFunc(p.orderedByGasPrice, p.reqSort) +} + +func (p *offLedgerPool) reqSort(a, b *OrderedPoolEntry) int { + cmp := p.GasPrice(a).Cmp(p.GasPrice(b)) + if cmp != 0 { + return cmp + } + // use requestID as a fallback in case of matching gas price (it's random and should give roughly the same order between nodes) + aID := a.req.ID() + bID := b.req.ID() + for i := range aID { + if aID[i] == bID[i] { + continue + } + if aID[i] > bID[i] { + return 1 + } + return -1 + } + return 0 +} + +func (p *offLedgerPool) Remove(request isc.OffLedgerRequest) { + refKey := isc.RequestRefFromRequest(request).AsKey() + entry, exists := p.refLUT.Get(refKey) + if !exists { + return // does not exist + } + defer func() { + p.sizeMetric(p.refLUT.Size()) + p.timeMetric(time.Since(entry.ts)) + }() + + // + // delete from the "requests reference" LookupTable + if p.refLUT.Delete(refKey) { + p.log.Debugf("DEL %v as key=%v", request.ID(), refKey) + } + + // + // find the request in the accounts map and delete it + { + account := entry.req.SenderAccount().String() + reqsByAccount, exists := p.reqsByAcountOrdered.Get(account) + if !exists { + p.log.Error("inconsistency trying to DEL %v as key=%v, no request list for account %s", request.ID(), refKey, account) + return + } + indexToDel := slices.IndexFunc(reqsByAccount, func(e *OrderedPoolEntry) bool { + return refKey == isc.RequestRefFromRequest(e.req).AsKey() + }) + if indexToDel == -1 { + p.log.Error("inconsistency trying to DEL %v as key=%v, request not found in list for account %s", request.ID(), refKey, account) + return + } + if len(reqsByAccount) == 1 { // just remove the entire array for the account + p.reqsByAcountOrdered.Delete(account) + } else { + reqsByAccount[indexToDel] = nil // remove the pointer reference to allow GC of the entry object + reqsByAccount = slices.Delete(reqsByAccount, indexToDel, indexToDel+1) + p.reqsByAcountOrdered.Set(account, reqsByAccount) + } + } + + // + // find and delete the request from the gas price ordered list + { + indexToDel := slices.IndexFunc(p.orderedByGasPrice, func(e *OrderedPoolEntry) bool { + return refKey == isc.RequestRefFromRequest(e.req).AsKey() + }) + p.orderedByGasPrice[indexToDel] = nil // remove the pointer reference to allow GC of the entry object + p.orderedByGasPrice = slices.Delete(p.orderedByGasPrice, indexToDel, indexToDel+1) + } +} + +func (p *offLedgerPool) Iterate(f func(account string, requests []*OrderedPoolEntry)) { + p.reqsByAcountOrdered.ForEach(func(acc string, entries []*OrderedPoolEntry) bool { + f(acc, slices.Clone(entries)) + return true + }) +} + +func (p *offLedgerPool) Cleanup(predicate func(request isc.OffLedgerRequest, ts time.Time) bool) { + p.refLUT.ForEach(func(refKey isc.RequestRefKey, entry *OrderedPoolEntry) bool { + if !predicate(entry.req, entry.ts) { + p.Remove(entry.req) + } + return true + }) + p.sizeMetric(p.refLUT.Size()) +} + +func (p *offLedgerPool) StatusString() string { + return fmt.Sprintf("{|req|=%d}", p.refLUT.Size()) +} + +func (p *offLedgerPool) WriteContent(w io.Writer) { + p.reqsByAcountOrdered.ForEach(func(_ string, list []*OrderedPoolEntry) bool { + for _, entry := range list { + jsonData, err := isc.RequestToJSON(entry.req) + if err != nil { + return false // stop iteration + } + _, err = w.Write(codec.EncodeUint32(uint32(len(jsonData)))) + if err != nil { + return false // stop iteration + } + _, err = w.Write(jsonData) + if err != nil { + return false // stop iteration + } + } + return true + }) +} diff --git a/packages/chain/mempool/offledger_pool_test.go b/packages/chain/mempool/offledger_pool_test.go new file mode 100644 index 0000000000..e21d00535b --- /dev/null +++ b/packages/chain/mempool/offledger_pool_test.go @@ -0,0 +1,99 @@ +package mempool + +import ( + "math/big" + "testing" + "time" + + "github.com/samber/lo" + "github.com/stretchr/testify/require" + + "github.com/iotaledger/wasp/packages/isc" + "github.com/iotaledger/wasp/packages/testutil" + "github.com/iotaledger/wasp/packages/testutil/testkey" + "github.com/iotaledger/wasp/packages/testutil/testlogger" +) + +func TestOffledgerMempoolAccountNonce(t *testing.T) { + waitReq := NewWaitReq(waitRequestCleanupEvery) + pool := NewOffledgerPool(100, waitReq, func(int) {}, func(time.Duration) {}, testlogger.NewSilentLogger("", true)) + + // generate a bunch of requests for the same account + kp, addr := testkey.GenKeyAddr() + agentID := isc.NewAgentID(addr) + + req0 := testutil.DummyOffledgerRequestForAccount(isc.RandomChainID(), 0, kp) + req1 := testutil.DummyOffledgerRequestForAccount(isc.RandomChainID(), 1, kp) + req2 := testutil.DummyOffledgerRequestForAccount(isc.RandomChainID(), 2, kp) + req2new := testutil.DummyOffledgerRequestForAccount(isc.RandomChainID(), 2, kp) + pool.Add(req0) + pool.Add(req1) + pool.Add(req1) // try to add the same request many times + pool.Add(req2) + pool.Add(req1) + require.EqualValues(t, 3, pool.refLUT.Size()) + require.EqualValues(t, 1, pool.reqsByAcountOrdered.Size()) + reqsInPoolForAccount, _ := pool.reqsByAcountOrdered.Get(agentID.String()) + require.Len(t, reqsInPoolForAccount, 3) + pool.Add(req2new) + pool.Add(req2new) + require.EqualValues(t, 4, pool.refLUT.Size()) + require.EqualValues(t, 1, pool.reqsByAcountOrdered.Size()) + reqsInPoolForAccount, _ = pool.reqsByAcountOrdered.Get(agentID.String()) + require.Len(t, reqsInPoolForAccount, 4) + + // try to remove everything during iteration + pool.Iterate(func(account string, entries []*OrderedPoolEntry) { + for _, e := range entries { + pool.Remove(e.req) + } + }) + require.EqualValues(t, 0, pool.refLUT.Size()) + require.EqualValues(t, 0, pool.reqsByAcountOrdered.Size()) +} + +func TestOffledgerMempoolLimit(t *testing.T) { + waitReq := NewWaitReq(waitRequestCleanupEvery) + poolSizeLimit := 3 + pool := NewOffledgerPool(poolSizeLimit, waitReq, func(int) {}, func(time.Duration) {}, testlogger.NewSilentLogger("", true)) + + // create requests with different gas prices + req0 := testutil.DummyEVMRequest(isc.RandomChainID(), big.NewInt(1)) + req1 := testutil.DummyEVMRequest(isc.RandomChainID(), big.NewInt(2)) + req2 := testutil.DummyEVMRequest(isc.RandomChainID(), big.NewInt(3)) + pool.Add(req0) + pool.Add(req1) + pool.Add(req2) + + assertPoolSize := func() { + require.EqualValues(t, 3, pool.refLUT.Size()) + require.Len(t, pool.orderedByGasPrice, 3) + require.EqualValues(t, 3, pool.reqsByAcountOrdered.Size()) + } + contains := func(reqs ...isc.OffLedgerRequest) { + for _, req := range reqs { + lo.ContainsBy(pool.orderedByGasPrice, func(e *OrderedPoolEntry) bool { + return e.req.ID().Equals(req.ID()) + }) + } + } + + assertPoolSize() + + // add a request with high + req3 := testutil.DummyEVMRequest(isc.RandomChainID(), big.NewInt(3)) + pool.Add(req3) + assertPoolSize() + contains(req1, req2, req3) // assert req3 was added and req0 was removed + + req4 := testutil.DummyEVMRequest(isc.RandomChainID(), big.NewInt(1)) + pool.Add(req4) + assertPoolSize() + contains(req1, req2, req3) // assert req4 is not added + + req5 := testutil.DummyEVMRequest(isc.RandomChainID(), big.NewInt(4)) + pool.Add(req5) + assertPoolSize() + + contains(req2, req3, req5) // assert req5 was added and req1 was removed +} diff --git a/packages/chain/mempool/time_pool.go b/packages/chain/mempool/time_pool.go index f6fbc74c21..2a171f9ff5 100644 --- a/packages/chain/mempool/time_pool.go +++ b/packages/chain/mempool/time_pool.go @@ -12,20 +12,22 @@ import ( "github.com/iotaledger/wasp/packages/isc" ) +// TODO limit this (sort by timelock) + // Maintains a pool of requests that have to be postponed until specified timestamp. type TimePool interface { - AddRequest(timestamp time.Time, request isc.Request) - TakeTill(timestamp time.Time) []isc.Request + AddRequest(timestamp time.Time, request isc.OnLedgerRequest) + TakeTill(timestamp time.Time) []isc.OnLedgerRequest Has(reqID *isc.RequestRef) bool - Filter(predicate func(request isc.Request, ts time.Time) bool) + Cleanup(predicate func(request isc.OnLedgerRequest, ts time.Time) bool) } // Here we implement TimePool. We maintain the request in a list ordered by a timestamp. // The list is organized in slots. Each slot contains a list of requests that fit to the // slot boundaries. type timePoolImpl struct { - requests *shrinkingmap.ShrinkingMap[isc.RequestRefKey, isc.Request] // All the requests in this pool. - slots *timeSlot // Structure to fetch them fast by their time. + requests *shrinkingmap.ShrinkingMap[isc.RequestRefKey, isc.OnLedgerRequest] // All the requests in this pool. + slots *timeSlot // Structure to fetch them fast by their time. sizeMetric func(int) log *logger.Logger } @@ -33,7 +35,7 @@ type timePoolImpl struct { type timeSlot struct { from time.Time till time.Time - reqs *shrinkingmap.ShrinkingMap[time.Time, []isc.Request] + reqs *shrinkingmap.ShrinkingMap[time.Time, []isc.OnLedgerRequest] next *timeSlot } @@ -43,14 +45,14 @@ var _ TimePool = &timePoolImpl{} func NewTimePool(sizeMetric func(int), log *logger.Logger) TimePool { return &timePoolImpl{ - requests: shrinkingmap.New[isc.RequestRefKey, isc.Request](), + requests: shrinkingmap.New[isc.RequestRefKey, isc.OnLedgerRequest](), slots: nil, sizeMetric: sizeMetric, log: log, } } -func (tpi *timePoolImpl) AddRequest(timestamp time.Time, request isc.Request) { +func (tpi *timePoolImpl) AddRequest(timestamp time.Time, request isc.OnLedgerRequest) { reqRefKey := isc.RequestRefFromRequest(request).AsKey() if tpi.requests.Has(reqRefKey) { @@ -66,8 +68,8 @@ func (tpi *timePoolImpl) AddRequest(timestamp time.Time, request isc.Request) { prevNext := &tpi.slots for slot := tpi.slots; ; { if slot == nil || slot.from.After(reqFrom) { // Add new slot (append or insert). - newRequests := shrinkingmap.New[time.Time, []isc.Request]() - newRequests.Set(timestamp, []isc.Request{request}) + newRequests := shrinkingmap.New[time.Time, []isc.OnLedgerRequest]() + newRequests.Set(timestamp, []isc.OnLedgerRequest{request}) newSlot := &timeSlot{ from: reqFrom, @@ -79,7 +81,7 @@ func (tpi *timePoolImpl) AddRequest(timestamp time.Time, request isc.Request) { return } if slot.from == reqFrom { // Add to existing slot. - requests, _ := slot.reqs.GetOrCreate(timestamp, func() []isc.Request { return make([]isc.Request, 0, 1) }) + requests, _ := slot.reqs.GetOrCreate(timestamp, func() []isc.OnLedgerRequest { return make([]isc.OnLedgerRequest, 0, 1) }) slot.reqs.Set(timestamp, append(requests, request)) return } @@ -88,13 +90,13 @@ func (tpi *timePoolImpl) AddRequest(timestamp time.Time, request isc.Request) { } } -func (tpi *timePoolImpl) TakeTill(timestamp time.Time) []isc.Request { - resp := []isc.Request{} +func (tpi *timePoolImpl) TakeTill(timestamp time.Time) []isc.OnLedgerRequest { + resp := []isc.OnLedgerRequest{} for slot := tpi.slots; slot != nil; slot = slot.next { if slot.from.After(timestamp) { break } - slot.reqs.ForEach(func(ts time.Time, tsReqs []isc.Request) bool { + slot.reqs.ForEach(func(ts time.Time, tsReqs []isc.OnLedgerRequest) bool { if ts == timestamp || ts.Before(timestamp) { resp = append(resp, tsReqs...) for _, req := range tsReqs { @@ -121,10 +123,10 @@ func (tpi *timePoolImpl) Has(reqRef *isc.RequestRef) bool { return tpi.requests.Has(reqRef.AsKey()) } -func (tpi *timePoolImpl) Filter(predicate func(request isc.Request, ts time.Time) bool) { +func (tpi *timePoolImpl) Cleanup(predicate func(request isc.OnLedgerRequest, ts time.Time) bool) { prevNext := &tpi.slots for slot := tpi.slots; slot != nil; slot = slot.next { - slot.reqs.ForEach(func(ts time.Time, tsReqs []isc.Request) bool { + slot.reqs.ForEach(func(ts time.Time, tsReqs []isc.OnLedgerRequest) bool { requests := tsReqs for i, req := range requests { if !predicate(req, ts) { diff --git a/packages/chain/mempool/time_pool_test.go b/packages/chain/mempool/time_pool_test.go index a5982e9bc4..8a88753fda 100644 --- a/packages/chain/mempool/time_pool_test.go +++ b/packages/chain/mempool/time_pool_test.go @@ -10,26 +10,31 @@ import ( "github.com/stretchr/testify/require" "pgregory.net/rapid" + iotago "github.com/iotaledger/iota.go/v3" + "github.com/iotaledger/iota.go/v3/tpkg" "github.com/iotaledger/wasp/packages/chain/mempool" "github.com/iotaledger/wasp/packages/cryptolib" "github.com/iotaledger/wasp/packages/isc" "github.com/iotaledger/wasp/packages/testutil/testlogger" - "github.com/iotaledger/wasp/packages/vm/core/governance" - "github.com/iotaledger/wasp/packages/vm/gas" ) func TestTimePoolBasic(t *testing.T) { log := testlogger.NewLogger(t) - kp := cryptolib.NewKeyPair() tp := mempool.NewTimePool(func(i int) {}, log) t0 := time.Now() t1 := t0.Add(17 * time.Nanosecond) t2 := t0.Add(17 * time.Minute) t3 := t0.Add(17 * time.Hour) - r0 := isc.NewOffLedgerRequest(isc.RandomChainID(), governance.Contract.Hname(), governance.FuncAddCandidateNode.Hname(), nil, 0, gas.LimitsDefault.MaxGasPerRequest).Sign(kp) - r1 := isc.NewOffLedgerRequest(isc.RandomChainID(), governance.Contract.Hname(), governance.FuncAddCandidateNode.Hname(), nil, 1, gas.LimitsDefault.MaxGasPerRequest).Sign(kp) - r2 := isc.NewOffLedgerRequest(isc.RandomChainID(), governance.Contract.Hname(), governance.FuncAddCandidateNode.Hname(), nil, 2, gas.LimitsDefault.MaxGasPerRequest).Sign(kp) - r3 := isc.NewOffLedgerRequest(isc.RandomChainID(), governance.Contract.Hname(), governance.FuncAddCandidateNode.Hname(), nil, 3, gas.LimitsDefault.MaxGasPerRequest).Sign(kp) + + r0, err := isc.OnLedgerFromUTXO(&iotago.BasicOutput{}, tpkg.RandOutputID(0)) + require.NoError(t, err) + r1, err := isc.OnLedgerFromUTXO(&iotago.BasicOutput{}, tpkg.RandOutputID(1)) + require.NoError(t, err) + r2, err := isc.OnLedgerFromUTXO(&iotago.BasicOutput{}, tpkg.RandOutputID(2)) + require.NoError(t, err) + r3, err := isc.OnLedgerFromUTXO(&iotago.BasicOutput{}, tpkg.RandOutputID(3)) + require.NoError(t, err) + require.False(t, tp.Has(isc.RequestRefFromRequest(r0))) require.False(t, tp.Has(isc.RequestRefFromRequest(r1))) require.False(t, tp.Has(isc.RequestRefFromRequest(r2))) @@ -43,7 +48,7 @@ func TestTimePoolBasic(t *testing.T) { require.True(t, tp.Has(isc.RequestRefFromRequest(r2))) require.True(t, tp.Has(isc.RequestRefFromRequest(r3))) - var taken []isc.Request + var taken []isc.OnLedgerRequest taken = tp.TakeTill(t0) require.Len(t, taken, 1) @@ -106,7 +111,8 @@ func (sm *timePoolSM) Check(t *rapid.T) { func (sm *timePoolSM) AddRequest(t *rapid.T) { ts := time.Unix(rapid.Int64().Draw(t, "req.ts"), 0) - req := isc.NewOffLedgerRequest(isc.RandomChainID(), governance.Contract.Hname(), governance.FuncAddCandidateNode.Hname(), nil, 0, gas.LimitsDefault.MaxGasPerRequest).Sign(sm.kp) + req, err := isc.OnLedgerFromUTXO(&iotago.BasicOutput{}, tpkg.RandOutputID(3)) + require.NoError(t, err) sm.tp.AddRequest(ts, req) sm.added++ } diff --git a/packages/chain/mempool/typed_pool.go b/packages/chain/mempool/typed_pool.go index 4398c9dc3e..78e23382e7 100644 --- a/packages/chain/mempool/typed_pool.go +++ b/packages/chain/mempool/typed_pool.go @@ -14,6 +14,10 @@ import ( "github.com/iotaledger/wasp/packages/kv/codec" ) +// TODO limit this list. +// TODO add gas to on-ledger requests +// TODO this list needs to be periodically re-filled from L1 once the activity is lower + type typedPool[V isc.Request] struct { waitReq WaitReq requests *shrinkingmap.ShrinkingMap[isc.RequestRefKey, *typedPoolEntry[V]] @@ -71,7 +75,7 @@ func (olp *typedPool[V]) Remove(request V) { } } -func (olp *typedPool[V]) Filter(predicate func(request V, ts time.Time) bool) { +func (olp *typedPool[V]) Cleanup(predicate func(request V, ts time.Time) bool) { olp.requests.ForEach(func(refKey isc.RequestRefKey, entry *typedPoolEntry[V]) bool { if !predicate(entry.req, entry.ts) { if olp.requests.Delete(refKey) { diff --git a/packages/chain/mempool/typed_pool_by_nonce.go b/packages/chain/mempool/typed_pool_by_nonce.go deleted file mode 100644 index c38b350f58..0000000000 --- a/packages/chain/mempool/typed_pool_by_nonce.go +++ /dev/null @@ -1,196 +0,0 @@ -// Copyright 2020 IOTA Stiftung -// SPDX-License-Identifier: Apache-2.0 - -package mempool - -import ( - "fmt" - "io" - "slices" - "time" - - "github.com/iotaledger/hive.go/ds/shrinkingmap" - "github.com/iotaledger/hive.go/logger" - consGR "github.com/iotaledger/wasp/packages/chain/cons/cons_gr" - "github.com/iotaledger/wasp/packages/isc" - "github.com/iotaledger/wasp/packages/kv/codec" -) - -// keeps a map of requests ordered by nonce for each account -type offLedgerPool struct { - waitReq WaitReq - refLUT *shrinkingmap.ShrinkingMap[isc.RequestRefKey, *OrderedPoolEntry] - // reqsByAcountOrdered keeps an ordered map of reqsByAcountOrdered for each account by nonce - reqsByAcountOrdered *shrinkingmap.ShrinkingMap[string, []*OrderedPoolEntry] // string is isc.AgentID.String() - sizeMetric func(int) - timeMetric func(time.Duration) - log *logger.Logger -} - -func NewOffledgerPool(waitReq WaitReq, sizeMetric func(int), timeMetric func(time.Duration), log *logger.Logger) *offLedgerPool { - return &offLedgerPool{ - waitReq: waitReq, - reqsByAcountOrdered: shrinkingmap.New[string, []*OrderedPoolEntry](), - refLUT: shrinkingmap.New[isc.RequestRefKey, *OrderedPoolEntry](), - sizeMetric: sizeMetric, - timeMetric: timeMetric, - log: log, - } -} - -type OrderedPoolEntry struct { - req isc.OffLedgerRequest - old bool - ts time.Time - proposedFor []consGR.ConsensusID -} - -func (p *offLedgerPool) Has(reqRef *isc.RequestRef) bool { - return p.refLUT.Has(reqRef.AsKey()) -} - -func (p *offLedgerPool) Get(reqRef *isc.RequestRef) isc.OffLedgerRequest { - entry, exists := p.refLUT.Get(reqRef.AsKey()) - if !exists { - return isc.OffLedgerRequest(nil) - } - return entry.req -} - -func (p *offLedgerPool) Add(request isc.OffLedgerRequest) { - // TODO keep an ordered list by gas price - - // TODO drop the tx with the lowest price if the total number of requests is too big - - // TODO apply a similar limit to on-ledger requests - - ref := isc.RequestRefFromRequest(request) - entry := &OrderedPoolEntry{req: request, ts: time.Now()} - account := request.SenderAccount().String() - - if !p.refLUT.Set(ref.AsKey(), entry) { - p.log.Debugf("NOT ADDED, already exists. reqID: %v as key=%v, senderAccount: ", request.ID(), ref, account) - return // not added already exists - } - - defer func() { - p.log.Debugf("ADD %v as key=%v, senderAccount: %s", request.ID(), ref, account) - p.sizeMetric(p.refLUT.Size()) - p.waitReq.MarkAvailable(request) - }() - - reqsForAcount, exists := p.reqsByAcountOrdered.Get(account) - if !exists { - // no other requests for this account - p.reqsByAcountOrdered.Set(account, []*OrderedPoolEntry{entry}) - return - } - - // add to the account requests, keep the slice ordered - - // find the index where the new entry should be added - index, exists := slices.BinarySearchFunc(reqsForAcount, entry, - func(a, b *OrderedPoolEntry) int { - aNonce := a.req.Nonce() - bNonce := b.req.Nonce() - if aNonce == bNonce { - return 0 - } - if aNonce > bNonce { - return 1 - } - return -1 - }, - ) - if exists { - // same nonce, mark the existing request with overlapping nonce as "old", place the new one - // NOTE: do not delete the request here, as it might already be part of an on-going consensus round - reqsForAcount[index].old = true - } - - reqsForAcount = append(reqsForAcount, entry) // add to the end of the list (thus extending the array) - - // make room if target position is not at the end - if index != len(reqsForAcount)+1 { - copy(reqsForAcount[index+1:], reqsForAcount[index:]) - reqsForAcount[index] = entry - } - p.reqsByAcountOrdered.Set(account, reqsForAcount) -} - -func (p *offLedgerPool) Remove(request isc.OffLedgerRequest) { - refKey := isc.RequestRefFromRequest(request).AsKey() - entry, exists := p.refLUT.Get(refKey) - if !exists { - return // does not exist - } - defer func() { - p.sizeMetric(p.refLUT.Size()) - p.timeMetric(time.Since(entry.ts)) - }() - if p.refLUT.Delete(refKey) { - p.log.Debugf("DEL %v as key=%v", request.ID(), refKey) - } - account := entry.req.SenderAccount().String() - reqsByAccount, exists := p.reqsByAcountOrdered.Get(account) - if !exists { - p.log.Error("inconsistency trying to DEL %v as key=%v, no request list for account %s", request.ID(), refKey, account) - return - } - // find the request in the accounts map - indexToDel := slices.IndexFunc(reqsByAccount, func(e *OrderedPoolEntry) bool { - return refKey == isc.RequestRefFromRequest(e.req).AsKey() - }) - if indexToDel == -1 { - p.log.Error("inconsistency trying to DEL %v as key=%v, request not found in list for account %s", request.ID(), refKey, account) - return - } - if len(reqsByAccount) == 1 { // just remove the entire array for the account - p.reqsByAcountOrdered.Delete(account) - return - } - reqsByAccount[indexToDel] = nil // remove the pointer reference to allow GC of the entry object - reqsByAccount = slices.Delete(reqsByAccount, indexToDel, indexToDel+1) - p.reqsByAcountOrdered.Set(account, reqsByAccount) -} - -func (p *offLedgerPool) Iterate(f func(account string, requests []*OrderedPoolEntry)) { - p.reqsByAcountOrdered.ForEach(func(acc string, entries []*OrderedPoolEntry) bool { - f(acc, slices.Clone(entries)) - return true - }) -} - -func (p *offLedgerPool) Filter(predicate func(request isc.OffLedgerRequest, ts time.Time) bool) { - p.refLUT.ForEach(func(refKey isc.RequestRefKey, entry *OrderedPoolEntry) bool { - if !predicate(entry.req, entry.ts) { - p.Remove(entry.req) - } - return true - }) - p.sizeMetric(p.refLUT.Size()) -} - -func (p *offLedgerPool) StatusString() string { - return fmt.Sprintf("{|req|=%d}", p.refLUT.Size()) -} - -func (p *offLedgerPool) WriteContent(w io.Writer) { - p.reqsByAcountOrdered.ForEach(func(_ string, list []*OrderedPoolEntry) bool { - for _, entry := range list { - jsonData, err := isc.RequestToJSON(entry.req) - if err != nil { - return false // stop iteration - } - _, err = w.Write(codec.EncodeUint32(uint32(len(jsonData)))) - if err != nil { - return false // stop iteration - } - _, err = w.Write(jsonData) - if err != nil { - return false // stop iteration - } - } - return true - }) -} diff --git a/packages/chain/mempool/typed_pool_by_nonce_test.go b/packages/chain/mempool/typed_pool_by_nonce_test.go deleted file mode 100644 index 8bbb96d291..0000000000 --- a/packages/chain/mempool/typed_pool_by_nonce_test.go +++ /dev/null @@ -1,51 +0,0 @@ -package mempool - -import ( - "testing" - "time" - - "github.com/stretchr/testify/require" - - "github.com/iotaledger/wasp/packages/isc" - "github.com/iotaledger/wasp/packages/testutil" - "github.com/iotaledger/wasp/packages/testutil/testkey" - "github.com/iotaledger/wasp/packages/testutil/testlogger" -) - -func TestSomething(t *testing.T) { - waitReq := NewWaitReq(waitRequestCleanupEvery) - pool := NewOffledgerPool(waitReq, func(int) {}, func(time.Duration) {}, testlogger.NewSilentLogger("", true)) - - // generate a bunch of requests for the same account - kp, addr := testkey.GenKeyAddr() - agentID := isc.NewAgentID(addr) - - req0 := testutil.DummyOffledgerRequestForAccount(isc.RandomChainID(), 0, kp) - req1 := testutil.DummyOffledgerRequestForAccount(isc.RandomChainID(), 1, kp) - req2 := testutil.DummyOffledgerRequestForAccount(isc.RandomChainID(), 2, kp) - req2new := testutil.DummyOffledgerRequestForAccount(isc.RandomChainID(), 2, kp) - pool.Add(req0) - pool.Add(req1) - pool.Add(req1) // try to add the same request many times - pool.Add(req2) - pool.Add(req1) - require.EqualValues(t, 3, pool.refLUT.Size()) - require.EqualValues(t, 1, pool.reqsByAcountOrdered.Size()) - reqsInPoolForAccount, _ := pool.reqsByAcountOrdered.Get(agentID.String()) - require.Len(t, reqsInPoolForAccount, 3) - pool.Add(req2new) - pool.Add(req2new) - require.EqualValues(t, 4, pool.refLUT.Size()) - require.EqualValues(t, 1, pool.reqsByAcountOrdered.Size()) - reqsInPoolForAccount, _ = pool.reqsByAcountOrdered.Get(agentID.String()) - require.Len(t, reqsInPoolForAccount, 4) - - // try to remove everything during iteration - pool.Iterate(func(account string, entries []*OrderedPoolEntry) { - for _, e := range entries { - pool.Remove(e.req) - } - }) - require.EqualValues(t, 0, pool.refLUT.Size()) - require.EqualValues(t, 0, pool.reqsByAcountOrdered.Size()) -} diff --git a/packages/isc/request.go b/packages/isc/request.go index 05a3e18926..80835d586e 100644 --- a/packages/isc/request.go +++ b/packages/isc/request.go @@ -81,7 +81,8 @@ type OffLedgerRequest interface { ChainID() ChainID Nonce() uint64 VerifySignature() error - GasPrice() (price *big.Int, specified bool) + EVMTransaction() *types.Transaction // TODO remove? + GasPrice() *big.Int } type OnLedgerRequest interface { diff --git a/packages/isc/request_evmcall.go b/packages/isc/request_evmcall.go index 7a92242e5b..6e24caf479 100644 --- a/packages/isc/request_evmcall.go +++ b/packages/isc/request_evmcall.go @@ -128,6 +128,6 @@ func (req *evmOffLedgerCallRequest) EVMCallMsg() *ethereum.CallMsg { } // GasPrice implements OffLedgerRequest. -func (req *evmOffLedgerCallRequest) GasPrice() (price *big.Int, specified bool) { - return req.callMsg.GasPrice, true +func (req *evmOffLedgerCallRequest) GasPrice() *big.Int { + return req.callMsg.GasPrice } diff --git a/packages/isc/request_evmtx.go b/packages/isc/request_evmtx.go index 5e6543b246..6ccfdade73 100644 --- a/packages/isc/request_evmtx.go +++ b/packages/isc/request_evmtx.go @@ -155,6 +155,10 @@ func (req *evmOffLedgerTxRequest) EVMCallMsg() *ethereum.CallMsg { return EVMCallDataFromTx(req.tx) } -func (req *evmOffLedgerTxRequest) GasPrice() (price *big.Int, specified bool) { - return req.tx.GasPrice(), true +func (req *evmOffLedgerTxRequest) TxValue() *big.Int { + return req.tx.Value() +} + +func (req *evmOffLedgerTxRequest) GasPrice() *big.Int { + return req.tx.GasPrice() } diff --git a/packages/isc/request_offledger.go b/packages/isc/request_offledger.go index fc50095f1e..f8a2971546 100644 --- a/packages/isc/request_offledger.go +++ b/packages/isc/request_offledger.go @@ -8,6 +8,7 @@ import ( "time" "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/core/types" "github.com/minio/blake2b-simd" iotago "github.com/iotaledger/iota.go/v3" @@ -273,6 +274,10 @@ func (req *OffLedgerRequestData) WithSender(sender *cryptolib.PublicKey) Unsigne return req } -func (req *OffLedgerRequestData) GasPrice() (price *big.Int, specified bool) { - return big.NewInt(0), false +func (*OffLedgerRequestData) EVMTransaction() *types.Transaction { + return nil +} + +func (req *OffLedgerRequestData) GasPrice() *big.Int { + return nil } diff --git a/packages/testutil/dummyrequest.go b/packages/testutil/dummyrequest.go index 697e13b83b..a0c9b49201 100644 --- a/packages/testutil/dummyrequest.go +++ b/packages/testutil/dummyrequest.go @@ -1,6 +1,12 @@ package testutil import ( + "math/big" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/iotaledger/wasp/packages/cryptolib" "github.com/iotaledger/wasp/packages/isc" "github.com/iotaledger/wasp/packages/kv/dict" @@ -24,3 +30,26 @@ func DummyOffledgerRequestForAccount(chainID isc.ChainID, nonce uint64, kp *cryp req := isc.NewOffLedgerRequest(chainID, contract, entrypoint, args, nonce, gas.LimitsDefault.MaxGasPerRequest) return req.Sign(kp) } + +func DummyEVMRequest(chainID isc.ChainID, gasPrice *big.Int) isc.OffLedgerRequest { + key, err := crypto.GenerateKey() + if err != nil { + panic(err) + } + + tx := types.MustSignNewTx(key, types.NewEIP155Signer(big.NewInt(0)), + &types.LegacyTx{ + Nonce: 0, + To: &common.MaxAddress, + Value: big.NewInt(123), + Gas: 10000, + GasPrice: gasPrice, + Data: []byte{}, + }) + + req, err := isc.NewEVMOffLedgerTxRequest(chainID, tx) + if err != nil { + panic(err) + } + return req +} diff --git a/packages/testutil/run_heavy_tests_false.go b/packages/testutil/run_heavy_tests_false.go index 7f8f874219..e5680b6c8f 100644 --- a/packages/testutil/run_heavy_tests_false.go +++ b/packages/testutil/run_heavy_tests_false.go @@ -5,7 +5,6 @@ package testutil import "testing" -//nolint:gocritic // its not a test function, but gets called by other test functions func RunHeavy(t *testing.T) { t.Logf("skipping heavy test %s", t.Name()) t.SkipNow() diff --git a/packages/vm/core/governance/stateaccess.go b/packages/vm/core/governance/stateaccess.go index dee372fde2..78488c99a5 100644 --- a/packages/vm/core/governance/stateaccess.go +++ b/packages/vm/core/governance/stateaccess.go @@ -4,6 +4,8 @@ package governance import ( + "math/big" + "github.com/iotaledger/wasp/packages/cryptolib" "github.com/iotaledger/wasp/packages/isc" "github.com/iotaledger/wasp/packages/kv" @@ -65,3 +67,8 @@ func (sa *StateAccess) ChainOwnerID() isc.AgentID { func (sa *StateAccess) GetBlockKeepAmount() int32 { return GetBlockKeepAmount(sa.state) } + +func (sa *StateAccess) DefaultGasPrice() *big.Int { + // TODO return the equivalent price defined by the feepolicy + panic("TODO implement DefaultGasPrice") +} From 82dce09f78888b3ec78053a70bbcf0073f535331 Mon Sep 17 00:00:00 2001 From: Jorge Silva Date: Wed, 17 Apr 2024 13:21:33 +0100 Subject: [PATCH 3/7] propose only top N more expensive requests (offledger) --- components/chains/component.go | 10 ++- components/chains/params.go | 11 ++- packages/chain/mempool/mempool.go | 88 ++++++++++++++++-------- packages/chain/mempool/mempool_test.go | 18 ++++- packages/chain/mempool/offledger_pool.go | 31 +++++---- packages/chain/mempool/time_pool.go | 1 + packages/chain/mempool/typed_pool.go | 1 + packages/chain/node.go | 4 +- packages/chain/node_test.go | 10 ++- packages/chains/chains.go | 9 +-- packages/isc/request.go | 1 - packages/isc/request_evmcall.go | 2 +- packages/isc/request_offledger.go | 5 -- 13 files changed, 125 insertions(+), 66 deletions(-) diff --git a/components/chains/component.go b/components/chains/component.go index 57565ea0c6..46905bf975 100644 --- a/components/chains/component.go +++ b/components/chains/component.go @@ -11,6 +11,7 @@ import ( hiveshutdown "github.com/iotaledger/hive.go/app/shutdown" "github.com/iotaledger/wasp/packages/chain" "github.com/iotaledger/wasp/packages/chain/cmt_log" + "github.com/iotaledger/wasp/packages/chain/mempool" "github.com/iotaledger/wasp/packages/chains" "github.com/iotaledger/wasp/packages/daemon" "github.com/iotaledger/wasp/packages/database" @@ -127,7 +128,14 @@ func provide(c *dig.Container) error { deps.NodeIdentityProvider, deps.ConsensusStateRegistry, deps.ChainListener, - ParamsChains.MempoolTTL, + mempool.Settings{ + TTL: ParamsChains.MempoolTTL, + MaxOffledgerInPool: ParamsChains.MempoolMaxOffledgerInPool, + MaxOnledgerInPool: ParamsChains.MempoolMaxOnledgerInPool, + MaxTimedInPool: ParamsChains.MempoolMaxTimedInPool, + MaxOnledgerToPropose: ParamsChains.MempoolMaxOnledgerToPropose, + MaxOffledgerToPropose: ParamsChains.MempoolMaxOffledgerToPropose, + }, ParamsChains.BroadcastInterval, shutdown.NewCoordinator("chains", Component.Logger().Named("Shutdown")), deps.ChainMetricsProvider, diff --git a/components/chains/params.go b/components/chains/params.go index 261de5d559..697e52f574 100644 --- a/components/chains/params.go +++ b/components/chains/params.go @@ -20,12 +20,11 @@ type ParametersChains struct { ConsensusInstsInAdvance int `default:"3" usage:""` AwaitReceiptCleanupEvery int `default:"100" usage:"for every this number AwaitReceipt will be cleaned up"` MempoolTTL time.Duration `default:"24h" usage:"Time that requests are allowed to sit in the mempool without being processed"` - MempoolMaxOffledgerInPool time.Duration `default:"10000" usage:"Maximum number of off-ledger requests kept in the mempool"` - MempoolMaxOnledgerInPool time.Duration `default:"2000" usage:"Maximum number of on-ledger requests kept in the mempool"` - MempoolMaxTimedInPool time.Duration `default:"500" usage:"Maximum number of timed on-ledger requests kept in the mempool"` - MempoolMaxOnledgerToPropose time.Duration `default:"200" usage:"Maximum number of offledger requests to propose for the next block"` - MempoolMaxOffledgerToPropose time.Duration `default:"100" usage:"Maximum number of on-ledger requests to propose for the next block"` - MempoolMAxTimedToPropose time.Duration `default:"100" usage:"Maximum number of timed on-ledger requests to propose for the next block"` + MempoolMaxOffledgerInPool int `default:"2000" usage:"Maximum number of off-ledger requests kept in the mempool"` + MempoolMaxOnledgerInPool int `default:"1000" usage:"Maximum number of on-ledger requests kept in the mempool"` + MempoolMaxTimedInPool int `default:"100" usage:"Maximum number of timed on-ledger requests kept in the mempool"` + MempoolMaxOffledgerToPropose int `default:"500" usage:"Maximum number of off-ledger requests to propose for the next block"` + MempoolMaxOnledgerToPropose int `default:"100" usage:"Maximum number of on-ledger requests to propose for the next block (includes timed requests)"` } type ParametersWAL struct { diff --git a/packages/chain/mempool/mempool.go b/packages/chain/mempool/mempool.go index 6fa416938c..1bcc680a1c 100644 --- a/packages/chain/mempool/mempool.go +++ b/packages/chain/mempool/mempool.go @@ -46,6 +46,7 @@ import ( "context" "fmt" "io" + "slices" "time" "github.com/samber/lo" @@ -113,6 +114,15 @@ type Mempool interface { GetContents() io.Reader } +type Settings struct { + TTL time.Duration // time to live (how much time requests are allowed to sit in the pool without being processed) + MaxOffledgerInPool int + MaxOnledgerInPool int // TODO unused + MaxTimedInPool int // TODO unused + MaxOnledgerToPropose int // (including timed-requests) // TODO unused + MaxOffledgerToPropose int +} + type RequestPool[V isc.Request] interface { Has(reqRef *isc.RequestRef) bool Get(reqRef *isc.RequestRef) V @@ -136,7 +146,7 @@ type mempoolImpl struct { tangleTime time.Time timePool TimePool // TODO limit this pool onLedgerPool RequestPool[isc.OnLedgerRequest] // TODO limit this pool - offLedgerPool *offLedgerPool + offLedgerPool *OffLedgerPool distSync gpa.GPA chainHeadAO *isc.AliasOutputWithID chainHeadState state.State @@ -160,7 +170,7 @@ type mempoolImpl struct { netPeerPubs map[gpa.NodeID]*cryptolib.PublicKey net peering.NetworkProvider activeConsensusInstances []consGR.ConsensusID - ttl time.Duration // time to live (how much time requests are allowed to sit in the pool without being processed) + settings Settings broadcastInterval time.Duration // how often requests should be rebroadcasted log *logger.Logger metrics *metrics.ChainMempoolMetrics @@ -214,9 +224,6 @@ type reqTrackNewChainHead struct { responseCh chan<- bool // only for tests, shouldn't be used in the chain package } -// TODO make these configurable -const MaxOffLedgerPoolSize = 1000 - func New( ctx context.Context, chainID isc.ChainID, @@ -226,7 +233,7 @@ func New( metrics *metrics.ChainMempoolMetrics, pipeMetrics *metrics.ChainPipeMetrics, listener ChainListener, - ttl time.Duration, + settings Settings, broadcastInterval time.Duration, ) Mempool { netPeeringID := peering.HashPeeringIDFromBytes(chainID.Bytes(), []byte("Mempool")) // ChainID × Mempool @@ -236,7 +243,7 @@ func New( tangleTime: time.Time{}, timePool: NewTimePool(metrics.SetTimePoolSize, log.Named("TIM")), onLedgerPool: NewTypedPool[isc.OnLedgerRequest](waitReq, metrics.SetOnLedgerPoolSize, metrics.SetOnLedgerReqTime, log.Named("ONL")), - offLedgerPool: NewOffledgerPool(MaxOffLedgerPoolSize, waitReq, metrics.SetOffLedgerPoolSize, metrics.SetOffLedgerReqTime, log.Named("OFF")), + offLedgerPool: NewOffledgerPool(settings.MaxOffledgerInPool, waitReq, metrics.SetOffLedgerPoolSize, metrics.SetOffLedgerReqTime, log.Named("OFF")), chainHeadAO: nil, serverNodesUpdatedPipe: pipe.NewInfinitePipe[*reqServerNodesUpdated](), serverNodes: []*cryptolib.PublicKey{}, @@ -257,7 +264,7 @@ func New( net: net, consensusInstancesUpdatedPipe: pipe.NewInfinitePipe[*reqConsensusInstancesUpdated](), activeConsensusInstances: []consGR.ConsensusID{}, - ttl: ttl, + settings: settings, broadcastInterval: broadcastInterval, log: log, metrics: metrics, @@ -593,11 +600,8 @@ func (mpi *mempoolImpl) handleConsensusProposal(recv *reqConsensusProposal) { mpi.handleConsensusProposalForChainHead(recv) } +//nolint:gocyclo func (mpi *mempoolImpl) refsToPropose(consensusID consGR.ConsensusID) []*isc.RequestRef { - // TODO add a limit of N requests to propose. - - // TODO change to propose the "top priced requests" - // // The case for matching ChainHeadAO and request BaseAO reqRefs := []*isc.RequestRef{} @@ -606,21 +610,28 @@ func (mpi *mempoolImpl) refsToPropose(consensusID consGR.ConsensusID) []*isc.Req if isc.RequestIsExpired(request, mpi.tangleTime) { return false // Drop it from the mempool } - if isc.RequestIsUnlockable(request, mpi.chainID.AsAddress(), mpi.tangleTime) { + if len(reqRefs) < mpi.settings.MaxOnledgerToPropose && isc.RequestIsUnlockable(request, mpi.chainID.AsAddress(), mpi.tangleTime) { reqRefs = append(reqRefs, isc.RequestRefFromRequest(request)) } return true // Keep them for now }) } - mpi.offLedgerPool.Iterate(func(account string, entries []*OrderedPoolEntry) { - agentID, err := isc.AgentIDFromString(account) - if err != nil { - panic(fmt.Errorf("invalid agentID string: %s", err.Error())) - } - accountNonce := mpi.nonce(agentID) - for _, e := range entries { - if time.Since(e.ts) > mpi.ttl { // stop proposing after TTL + // + // iterate the ordered txs and add the first valid ones (respect nonce) to propose + // stop iterating when either: got MaxOffledgerToPropose, or no requests were added during last iteration (there are gaps in nonces) + accNonces := make(map[string]uint64) // cache of account nonces so we don't propose gaps + orderedList := slices.Clone(mpi.offLedgerPool.orderedByGasPrice) // clone the ordered list of references to requests, so we can alter it safely + for { + added := 0 + addedThisCycle := false + for i, e := range orderedList { + if e == nil { + continue + } + // + // drop tx with expired TTL + if time.Since(e.ts) > mpi.settings.TTL { // stop proposing after TTL if !lo.Some(mpi.consensusInstances, e.proposedFor) { // request not used in active consensus anymore, remove it mpi.log.Debugf("refsToPropose, request TTL expired, removing: %s", e.req.ID().String()) @@ -633,31 +644,52 @@ func (mpi *mempoolImpl) refsToPropose(consensusID consGR.ConsensusID) []*isc.Req if e.old { // this request was marked as "old", do not propose it - mpi.log.Debugf("refsToPropose, account: %s, skipping old request: %s", account, e.req.ID().String()) + mpi.log.Debugf("refsToPropose, skipping old request: %s", e.req.ID().String()) continue } + reqAccount := e.req.SenderAccount() + reqAccountKey := reqAccount.String() + accountNonce, ok := accNonces[reqAccountKey] + if !ok { + accountNonce = mpi.nonce(reqAccount) + accNonces[reqAccountKey] = accountNonce + } + reqNonce := e.req.Nonce() if reqNonce < accountNonce { // nonce too old, delete - mpi.log.Debugf("refsToPropose, account: %s, removing request (%s) with old nonce (%d) from the pool", account, e.req.ID(), e.req.Nonce()) + mpi.log.Debugf("refsToPropose, account: %s, removing request (%s) with old nonce (%d) from the pool", reqAccount, e.req.ID(), e.req.Nonce()) mpi.offLedgerPool.Remove(e.req) continue } if reqNonce == accountNonce { // expected nonce, add it to the list to propose - mpi.log.Debugf("refsToPropose, account: %s, proposing reqID %s with nonce: %d", account, e.req.ID().String(), e.req.Nonce()) + mpi.log.Debugf("refsToPropose, account: %s, proposing reqID %s with nonce: %d", reqAccount, e.req.ID().String(), e.req.Nonce()) reqRefs = append(reqRefs, isc.RequestRefFromRequest(e.req)) e.proposedFor = append(e.proposedFor, consensusID) + addedThisCycle = true + added++ accountNonce++ // increment the account nonce to match the next valid request + accNonces[reqAccountKey] = accountNonce + // delete from this list + orderedList[i] = nil } + + if added >= mpi.settings.MaxOffledgerToPropose { + break // got enough requests + } + if reqNonce > accountNonce { - mpi.log.Debugf("refsToPropose, account: %s, req %s has a nonce %d which is too high (expected %d), won't be proposed", account, e.req.ID().String(), e.req.Nonce(), accountNonce) - return // no more valid nonces for this account, continue to the next account + mpi.log.Debugf("refsToPropose, account: %s, req %s has a nonce %d which is too high (expected %d), won't be proposed", reqAccount, e.req.ID().String(), e.req.Nonce(), accountNonce) + continue // skip request } } - }) + if !addedThisCycle || (added >= mpi.settings.MaxOffledgerToPropose) { + break + } + } return reqRefs } @@ -917,7 +949,7 @@ func (mpi *mempoolImpl) handleRePublishTimeTick() { func (mpi *mempoolImpl) handleForceCleanMempool() { mpi.offLedgerPool.Iterate(func(account string, entries []*OrderedPoolEntry) { for _, e := range entries { - if time.Since(e.ts) > mpi.ttl && !lo.Some(mpi.consensusInstances, e.proposedFor) { + if time.Since(e.ts) > mpi.settings.TTL && !lo.Some(mpi.consensusInstances, e.proposedFor) { mpi.log.Debugf("handleForceCleanMempool, request TTL expired, removing: %s", e.req.ID().String()) mpi.offLedgerPool.Remove(e.req) } diff --git a/packages/chain/mempool/mempool_test.go b/packages/chain/mempool/mempool_test.go index fd92460a3c..f0024c2642 100644 --- a/packages/chain/mempool/mempool_test.go +++ b/packages/chain/mempool/mempool_test.go @@ -679,7 +679,14 @@ func TestTTL(t *testing.T) { chainMetrics.Mempool, chainMetrics.Pipe, chain.NewEmptyChainListener(), - 200*time.Millisecond, // 200ms TTL + mempool.Settings{ + TTL: 200 * time.Millisecond, + MaxOffledgerInPool: 1000, + MaxOnledgerInPool: 1000, + MaxTimedInPool: 1000, + MaxOnledgerToPropose: 1000, + MaxOffledgerToPropose: 1000, + }, 1*time.Second, ) defer te.close() @@ -804,7 +811,14 @@ func newEnv(t *testing.T, n, f int, reliable bool) *testEnv { chainMetrics.Mempool, chainMetrics.Pipe, chain.NewEmptyChainListener(), - 24*time.Hour, + mempool.Settings{ + TTL: 24 * time.Hour, + MaxOffledgerInPool: 1000, + MaxOnledgerInPool: 1000, + MaxTimedInPool: 1000, + MaxOnledgerToPropose: 1000, + MaxOffledgerToPropose: 1000, + }, 1*time.Second, ) } diff --git a/packages/chain/mempool/offledger_pool.go b/packages/chain/mempool/offledger_pool.go index 4e142e468e..07751dc755 100644 --- a/packages/chain/mempool/offledger_pool.go +++ b/packages/chain/mempool/offledger_pool.go @@ -18,7 +18,7 @@ import ( ) // keeps a map of requests ordered by nonce for each account -type offLedgerPool struct { +type OffLedgerPool struct { waitReq WaitReq refLUT *shrinkingmap.ShrinkingMap[isc.RequestRefKey, *OrderedPoolEntry] // reqsByAcountOrdered keeps an ordered map of reqsByAcountOrdered for each account by nonce @@ -32,8 +32,8 @@ type offLedgerPool struct { log *logger.Logger } -func NewOffledgerPool(maxPoolSize int, waitReq WaitReq, sizeMetric func(int), timeMetric func(time.Duration), log *logger.Logger) *offLedgerPool { - return &offLedgerPool{ +func NewOffledgerPool(maxPoolSize int, waitReq WaitReq, sizeMetric func(int), timeMetric func(time.Duration), log *logger.Logger) *OffLedgerPool { + return &OffLedgerPool{ waitReq: waitReq, refLUT: shrinkingmap.New[isc.RequestRefKey, *OrderedPoolEntry](), reqsByAcountOrdered: shrinkingmap.New[string, []*OrderedPoolEntry](), @@ -53,11 +53,11 @@ type OrderedPoolEntry struct { proposedFor []consGR.ConsensusID } -func (p *offLedgerPool) Has(reqRef *isc.RequestRef) bool { +func (p *OffLedgerPool) Has(reqRef *isc.RequestRef) bool { return p.refLUT.Has(reqRef.AsKey()) } -func (p *offLedgerPool) Get(reqRef *isc.RequestRef) isc.OffLedgerRequest { +func (p *OffLedgerPool) Get(reqRef *isc.RequestRef) isc.OffLedgerRequest { entry, exists := p.refLUT.Get(reqRef.AsKey()) if !exists { return isc.OffLedgerRequest(nil) @@ -65,7 +65,7 @@ func (p *offLedgerPool) Get(reqRef *isc.RequestRef) isc.OffLedgerRequest { return entry.req } -func (p *offLedgerPool) Add(request isc.OffLedgerRequest) { +func (p *OffLedgerPool) Add(request isc.OffLedgerRequest) { ref := isc.RequestRefFromRequest(request) entry := &OrderedPoolEntry{req: request, ts: time.Now()} account := request.SenderAccount().String() @@ -139,7 +139,7 @@ func (p *offLedgerPool) Add(request isc.OffLedgerRequest) { } // LimitPoolSize drops the txs with the lowest price if the total number of requests is too big -func (p *offLedgerPool) LimitPoolSize() { +func (p *OffLedgerPool) LimitPoolSize() { // TODO apply a similar limit to on-ledger/time pool (it cannot be unbound) if len(p.orderedByGasPrice) <= p.maxPoolSize { return // nothing to do @@ -159,11 +159,12 @@ func (p *offLedgerPool) LimitPoolSize() { } for _, r := range reqsToDelete { + p.log.Debugf("LimitPoolSize dropping request: %v", r.req.ID()) p.Remove(r.req) } } -func (p *offLedgerPool) GasPrice(e *OrderedPoolEntry) *big.Int { +func (p *OffLedgerPool) GasPrice(e *OrderedPoolEntry) *big.Int { price := e.req.GasPrice() if price != nil { return price @@ -172,7 +173,7 @@ func (p *offLedgerPool) GasPrice(e *OrderedPoolEntry) *big.Int { return p.minGasPrice } -func (p *offLedgerPool) SetMinGasPrice(newPrice *big.Int) { +func (p *OffLedgerPool) SetMinGasPrice(newPrice *big.Int) { if p.minGasPrice.Cmp(newPrice) == 0 { // no change return @@ -182,7 +183,7 @@ func (p *offLedgerPool) SetMinGasPrice(newPrice *big.Int) { slices.SortFunc(p.orderedByGasPrice, p.reqSort) } -func (p *offLedgerPool) reqSort(a, b *OrderedPoolEntry) int { +func (p *OffLedgerPool) reqSort(a, b *OrderedPoolEntry) int { cmp := p.GasPrice(a).Cmp(p.GasPrice(b)) if cmp != 0 { return cmp @@ -202,7 +203,7 @@ func (p *offLedgerPool) reqSort(a, b *OrderedPoolEntry) int { return 0 } -func (p *offLedgerPool) Remove(request isc.OffLedgerRequest) { +func (p *OffLedgerPool) Remove(request isc.OffLedgerRequest) { refKey := isc.RequestRefFromRequest(request).AsKey() entry, exists := p.refLUT.Get(refKey) if !exists { @@ -255,14 +256,14 @@ func (p *offLedgerPool) Remove(request isc.OffLedgerRequest) { } } -func (p *offLedgerPool) Iterate(f func(account string, requests []*OrderedPoolEntry)) { +func (p *OffLedgerPool) Iterate(f func(account string, requests []*OrderedPoolEntry)) { p.reqsByAcountOrdered.ForEach(func(acc string, entries []*OrderedPoolEntry) bool { f(acc, slices.Clone(entries)) return true }) } -func (p *offLedgerPool) Cleanup(predicate func(request isc.OffLedgerRequest, ts time.Time) bool) { +func (p *OffLedgerPool) Cleanup(predicate func(request isc.OffLedgerRequest, ts time.Time) bool) { p.refLUT.ForEach(func(refKey isc.RequestRefKey, entry *OrderedPoolEntry) bool { if !predicate(entry.req, entry.ts) { p.Remove(entry.req) @@ -272,11 +273,11 @@ func (p *offLedgerPool) Cleanup(predicate func(request isc.OffLedgerRequest, ts p.sizeMetric(p.refLUT.Size()) } -func (p *offLedgerPool) StatusString() string { +func (p *OffLedgerPool) StatusString() string { return fmt.Sprintf("{|req|=%d}", p.refLUT.Size()) } -func (p *offLedgerPool) WriteContent(w io.Writer) { +func (p *OffLedgerPool) WriteContent(w io.Writer) { p.reqsByAcountOrdered.ForEach(func(_ string, list []*OrderedPoolEntry) bool { for _, entry := range list { jsonData, err := isc.RequestToJSON(entry.req) diff --git a/packages/chain/mempool/time_pool.go b/packages/chain/mempool/time_pool.go index 2a171f9ff5..48f4291823 100644 --- a/packages/chain/mempool/time_pool.go +++ b/packages/chain/mempool/time_pool.go @@ -124,6 +124,7 @@ func (tpi *timePoolImpl) Has(reqRef *isc.RequestRef) bool { } func (tpi *timePoolImpl) Cleanup(predicate func(request isc.OnLedgerRequest, ts time.Time) bool) { + // TODO iterate using an order (gas price) prevNext := &tpi.slots for slot := tpi.slots; slot != nil; slot = slot.next { slot.reqs.ForEach(func(ts time.Time, tsReqs []isc.OnLedgerRequest) bool { diff --git a/packages/chain/mempool/typed_pool.go b/packages/chain/mempool/typed_pool.go index 78e23382e7..51b6967747 100644 --- a/packages/chain/mempool/typed_pool.go +++ b/packages/chain/mempool/typed_pool.go @@ -76,6 +76,7 @@ func (olp *typedPool[V]) Remove(request V) { } func (olp *typedPool[V]) Cleanup(predicate func(request V, ts time.Time) bool) { + // TODO iterate using an order (gas price) olp.requests.ForEach(func(refKey isc.RequestRefKey, entry *typedPoolEntry[V]) bool { if !predicate(entry.req, entry.ts) { if olp.requests.Delete(refKey) { diff --git a/packages/chain/node.go b/packages/chain/node.go index ee29882119..5527225184 100644 --- a/packages/chain/node.go +++ b/packages/chain/node.go @@ -282,7 +282,7 @@ func New( recoveryTimeout time.Duration, validatorAgentID isc.AgentID, smParameters sm_gpa.StateManagerParameters, - mempoolTTL time.Duration, + mempoolSettings mempool.Settings, mempoolBroadcastInterval time.Duration, ) (Chain, error) { log.Debugf("Starting the chain, chainID=%v", chainID) @@ -431,7 +431,7 @@ func New( chainMetrics.Mempool, chainMetrics.Pipe, cni.listener, - mempoolTTL, + mempoolSettings, mempoolBroadcastInterval, ) cni.chainMgr = gpa.NewAckHandler(cni.me, chainMgr.AsGPA(), RedeliveryPeriod) diff --git a/packages/chain/node_test.go b/packages/chain/node_test.go index ec35b9558d..31b2c3f695 100644 --- a/packages/chain/node_test.go +++ b/packages/chain/node_test.go @@ -19,6 +19,7 @@ import ( iotago "github.com/iotaledger/iota.go/v3" "github.com/iotaledger/wasp/contracts/native/inccounter" "github.com/iotaledger/wasp/packages/chain" + "github.com/iotaledger/wasp/packages/chain/mempool" "github.com/iotaledger/wasp/packages/chain/statemanager/sm_gpa" "github.com/iotaledger/wasp/packages/chain/statemanager/sm_gpa/sm_gpa_utils" "github.com/iotaledger/wasp/packages/chain/statemanager/sm_snapshots" @@ -468,7 +469,14 @@ func newEnv(t *testing.T, n, f int, reliable bool) *testEnv { 10*time.Second, accounts.CommonAccount(), sm_gpa.NewStateManagerParameters(), - 24*time.Hour, + mempool.Settings{ + TTL: 24 * time.Hour, + MaxOffledgerInPool: 1000, + MaxOnledgerInPool: 1000, + MaxTimedInPool: 1000, + MaxOnledgerToPropose: 1000, + MaxOffledgerToPropose: 1000, + }, 1*time.Second, ) require.NoError(t, err) diff --git a/packages/chains/chains.go b/packages/chains/chains.go index 0076e5e160..51f4897820 100644 --- a/packages/chains/chains.go +++ b/packages/chains/chains.go @@ -17,6 +17,7 @@ import ( iotago "github.com/iotaledger/iota.go/v3" "github.com/iotaledger/wasp/packages/chain" "github.com/iotaledger/wasp/packages/chain/cmt_log" + "github.com/iotaledger/wasp/packages/chain/mempool" "github.com/iotaledger/wasp/packages/chain/statemanager/sm_gpa" "github.com/iotaledger/wasp/packages/chain/statemanager/sm_gpa/sm_gpa_utils" "github.com/iotaledger/wasp/packages/chain/statemanager/sm_snapshots" @@ -92,7 +93,7 @@ type Chains struct { validatorFeeAddr iotago.Address - mempoolTTL time.Duration + mempoolSettings mempool.Settings mempoolBroadcastInterval time.Duration } @@ -136,7 +137,7 @@ func New( nodeIdentityProvider registry.NodeIdentityProvider, consensusStateRegistry cmt_log.ConsensusStateRegistry, chainListener chain.ChainListener, - mempoolTTL time.Duration, + mempoolSettings mempool.Settings, mempoolBroadcastInterval time.Duration, shutdownCoordinator *shutdown.Coordinator, chainMetricsProvider *metrics.ChainMetricsProvider, @@ -186,7 +187,7 @@ func New( dkShareRegistryProvider: dkShareRegistryProvider, nodeIdentityProvider: nodeIdentityProvider, chainListener: nil, // See bellow. - mempoolTTL: mempoolTTL, + mempoolSettings: mempoolSettings, mempoolBroadcastInterval: mempoolBroadcastInterval, consensusStateRegistry: consensusStateRegistry, shutdownCoordinator: shutdownCoordinator, @@ -421,7 +422,7 @@ func (c *Chains) activateWithoutLocking(chainID isc.ChainID) error { //nolint:fu c.recoveryTimeout, validatorAgentID, stateManagerParameters, - c.mempoolTTL, + c.mempoolSettings, c.mempoolBroadcastInterval, ) if err != nil { diff --git a/packages/isc/request.go b/packages/isc/request.go index 80835d586e..c4a049db3c 100644 --- a/packages/isc/request.go +++ b/packages/isc/request.go @@ -81,7 +81,6 @@ type OffLedgerRequest interface { ChainID() ChainID Nonce() uint64 VerifySignature() error - EVMTransaction() *types.Transaction // TODO remove? GasPrice() *big.Int } diff --git a/packages/isc/request_evmcall.go b/packages/isc/request_evmcall.go index 6e24caf479..aacc288067 100644 --- a/packages/isc/request_evmcall.go +++ b/packages/isc/request_evmcall.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "io" + "math/big" "github.com/ethereum/go-ethereum" @@ -127,7 +128,6 @@ func (req *evmOffLedgerCallRequest) EVMCallMsg() *ethereum.CallMsg { return &req.callMsg } -// GasPrice implements OffLedgerRequest. func (req *evmOffLedgerCallRequest) GasPrice() *big.Int { return req.callMsg.GasPrice } diff --git a/packages/isc/request_offledger.go b/packages/isc/request_offledger.go index f8a2971546..ccd778c0ac 100644 --- a/packages/isc/request_offledger.go +++ b/packages/isc/request_offledger.go @@ -8,7 +8,6 @@ import ( "time" "github.com/ethereum/go-ethereum" - "github.com/ethereum/go-ethereum/core/types" "github.com/minio/blake2b-simd" iotago "github.com/iotaledger/iota.go/v3" @@ -274,10 +273,6 @@ func (req *OffLedgerRequestData) WithSender(sender *cryptolib.PublicKey) Unsigne return req } -func (*OffLedgerRequestData) EVMTransaction() *types.Transaction { - return nil -} - func (req *OffLedgerRequestData) GasPrice() *big.Int { return nil } From 526e8649a378563ec118a3749bdb7061e8ada5d4 Mon Sep 17 00:00:00 2001 From: Jorge Silva Date: Fri, 19 Apr 2024 09:30:10 +0100 Subject: [PATCH 4/7] limit on-ledger and timed pools --- packages/chain/mempool/mempool.go | 38 +++-- packages/chain/mempool/mempool_test.go | 6 +- packages/chain/mempool/offledger_pool.go | 32 ++-- packages/chain/mempool/time_pool.go | 67 ++++++--- packages/chain/mempool/time_pool_test.go | 36 ++++- packages/chain/mempool/typed_pool.go | 162 ++++++++++++++++----- packages/chain/mempool/typed_pool_test.go | 39 +++++ packages/evm/jsonrpc/jsonrpctest/env.go | 13 ++ packages/vm/core/governance/stateaccess.go | 4 +- 9 files changed, 309 insertions(+), 88 deletions(-) create mode 100644 packages/chain/mempool/typed_pool_test.go diff --git a/packages/chain/mempool/mempool.go b/packages/chain/mempool/mempool.go index 1bcc680a1c..3ec5004e40 100644 --- a/packages/chain/mempool/mempool.go +++ b/packages/chain/mempool/mempool.go @@ -117,9 +117,9 @@ type Mempool interface { type Settings struct { TTL time.Duration // time to live (how much time requests are allowed to sit in the pool without being processed) MaxOffledgerInPool int - MaxOnledgerInPool int // TODO unused - MaxTimedInPool int // TODO unused - MaxOnledgerToPropose int // (including timed-requests) // TODO unused + MaxOnledgerInPool int + MaxTimedInPool int + MaxOnledgerToPropose int // (including timed-requests) MaxOffledgerToPropose int } @@ -130,7 +130,7 @@ type RequestPool[V isc.Request] interface { Remove(request V) // this removes requests from the pool if predicate returns false Cleanup(predicate func(request V, ts time.Time) bool) - Iterate(f func(e *typedPoolEntry[V])) + Iterate(f func(e *typedPoolEntry[V]) bool) StatusString() string WriteContent(io.Writer) } @@ -241,8 +241,8 @@ func New( mpi := &mempoolImpl{ chainID: chainID, tangleTime: time.Time{}, - timePool: NewTimePool(metrics.SetTimePoolSize, log.Named("TIM")), - onLedgerPool: NewTypedPool[isc.OnLedgerRequest](waitReq, metrics.SetOnLedgerPoolSize, metrics.SetOnLedgerReqTime, log.Named("ONL")), + timePool: NewTimePool(settings.MaxTimedInPool, metrics.SetTimePoolSize, log.Named("TIM")), + onLedgerPool: NewTypedPool[isc.OnLedgerRequest](settings.MaxOnledgerInPool, waitReq, metrics.SetOnLedgerPoolSize, metrics.SetOnLedgerReqTime, log.Named("ONL")), offLedgerPool: NewOffledgerPool(settings.MaxOffledgerInPool, waitReq, metrics.SetOffLedgerPoolSize, metrics.SetOffLedgerReqTime, log.Named("OFF")), chainHeadAO: nil, serverNodesUpdatedPipe: pipe.NewInfinitePipe[*reqServerNodesUpdated](), @@ -606,14 +606,19 @@ func (mpi *mempoolImpl) refsToPropose(consensusID consGR.ConsensusID) []*isc.Req // The case for matching ChainHeadAO and request BaseAO reqRefs := []*isc.RequestRef{} if !mpi.tangleTime.IsZero() { // Wait for tangle-time to process the on ledger requests. - mpi.onLedgerPool.Cleanup(func(request isc.OnLedgerRequest, _ time.Time) bool { - if isc.RequestIsExpired(request, mpi.tangleTime) { - return false // Drop it from the mempool + mpi.onLedgerPool.Iterate(func(e *typedPoolEntry[isc.OnLedgerRequest]) bool { + if isc.RequestIsExpired(e.req, mpi.tangleTime) { + mpi.onLedgerPool.Remove(e.req) // Drop it from the mempool + return true } - if len(reqRefs) < mpi.settings.MaxOnledgerToPropose && isc.RequestIsUnlockable(request, mpi.chainID.AsAddress(), mpi.tangleTime) { - reqRefs = append(reqRefs, isc.RequestRefFromRequest(request)) + if isc.RequestIsUnlockable(e.req, mpi.chainID.AsAddress(), mpi.tangleTime) { + reqRefs = append(reqRefs, isc.RequestRefFromRequest(e.req)) + e.proposedFor = append(e.proposedFor, consensusID) + } + if len(reqRefs) >= mpi.settings.MaxOnledgerToPropose { + return false } - return true // Keep them for now + return true }) } @@ -821,8 +826,9 @@ func (mpi *mempoolImpl) handleTangleTimeUpdated(tangleTime time.Time) { // // Notify existing on-ledger requests if that's first time update. if oldTangleTime.IsZero() { - mpi.onLedgerPool.Iterate(func(e *typedPoolEntry[isc.OnLedgerRequest]) { + mpi.onLedgerPool.Iterate(func(e *typedPoolEntry[isc.OnLedgerRequest]) bool { mpi.waitReq.MarkAvailable(e.req) + return true }) } } @@ -835,9 +841,6 @@ func (mpi *mempoolImpl) handleTrackNewChainHead(req *reqTrackNewChainHead) { defer close(req.responseCh) mpi.log.Debugf("handleTrackNewChainHead, %v from %v, current=%v", req.till, req.from, mpi.chainHeadAO) - // update defaultGasPrice for offLedger requests - mpi.offLedgerPool.SetMinGasPrice(governance.NewStateAccess(mpi.chainHeadState).DefaultGasPrice()) - if len(req.removed) != 0 { mpi.log.Infof("Reorg detected, removing %v blocks, adding %v blocks", len(req.removed), len(req.added)) // TODO: For IOTA 2.0: Maybe re-read the state from L1 (when reorgs will become possible). @@ -905,6 +908,9 @@ func (mpi *mempoolImpl) handleTrackNewChainHead(req *reqTrackNewChainHead) { } mpi.waitChainHead = newWaitChainHead } + + // update defaultGasPrice for offLedger requests + mpi.offLedgerPool.SetMinGasPrice(governance.NewStateAccess(mpi.chainHeadState).DefaultGasPrice()) } func (mpi *mempoolImpl) handleNetMessage(recv *peering.PeerMessageIn) { diff --git a/packages/chain/mempool/mempool_test.go b/packages/chain/mempool/mempool_test.go index f0024c2642..3e16fa245c 100644 --- a/packages/chain/mempool/mempool_test.go +++ b/packages/chain/mempool/mempool_test.go @@ -516,14 +516,14 @@ func TestMempoolsNonceGaps(t *testing.T) { askProposalExpectReqs := func(ao *isc.AliasOutputWithID, reqs ...isc.Request) *isc.AliasOutputWithID { t.Log("Ask for proposals") - proposals := make([]<-chan []*isc.RequestRef, len(te.mempools)) + proposalCh := make([]<-chan []*isc.RequestRef, len(te.mempools)) for i, node := range te.mempools { - proposals[i] = node.ConsensusProposalAsync(te.ctx, ao, consGR.ConsensusID{}) + proposalCh[i] = node.ConsensusProposalAsync(te.ctx, ao, consGR.ConsensusID{}) } t.Log("Wait for proposals and ask for decided requests") decided := make([]<-chan []isc.Request, len(te.mempools)) for i, node := range te.mempools { - proposal := <-proposals[i] + proposal := <-proposalCh[i] require.Len(t, proposal, len(reqs)) decided[i] = node.ConsensusRequestsAsync(te.ctx, proposal) } diff --git a/packages/chain/mempool/offledger_pool.go b/packages/chain/mempool/offledger_pool.go index 07751dc755..ef06097874 100644 --- a/packages/chain/mempool/offledger_pool.go +++ b/packages/chain/mempool/offledger_pool.go @@ -10,6 +10,8 @@ import ( "slices" "time" + "github.com/samber/lo" + "github.com/iotaledger/hive.go/ds/shrinkingmap" "github.com/iotaledger/hive.go/logger" consGR "github.com/iotaledger/wasp/packages/chain/cons/cons_gr" @@ -77,13 +79,6 @@ func (p *OffLedgerPool) Add(request isc.OffLedgerRequest) { return // not added already exists } - // update metrics and signal that the request is available, once this function ends - defer func() { - p.log.Debugf("ADD %v as key=%v, senderAccount: %s", request.ID(), ref, account) - p.sizeMetric(p.refLUT.Size()) - p.waitReq.MarkAvailable(request) - }() - // // add to the account requests, keep the slice ordered { @@ -115,7 +110,7 @@ func (p *OffLedgerPool) Add(request isc.OffLedgerRequest) { reqsForAcount = append(reqsForAcount, entry) // add to the end of the list (thus extending the array) // make room if target position is not at the end - if index != len(reqsForAcount)+1 { + if index != len(reqsForAcount)-1 { copy(reqsForAcount[index+1:], reqsForAcount[index:]) reqsForAcount[index] = entry } @@ -129,20 +124,30 @@ func (p *OffLedgerPool) Add(request isc.OffLedgerRequest) { index, _ := slices.BinarySearchFunc(p.orderedByGasPrice, entry, p.reqSort) p.orderedByGasPrice = append(p.orderedByGasPrice, entry) // make room if target position is not at the end - if index != len(p.orderedByGasPrice) { + if index != len(p.orderedByGasPrice)-1 { copy(p.orderedByGasPrice[index+1:], p.orderedByGasPrice[index:]) p.orderedByGasPrice[index] = entry } } - p.LimitPoolSize() + // keep the pool size in check + deleted := p.LimitPoolSize() + if lo.Contains(deleted, entry) { + // this exact request was deleted from the pool, do not update metrics, or mark available + return + } + + // + // update metrics and signal that the request is available + p.log.Debugf("ADD %v as key=%v, senderAccount: %s", request.ID(), ref, account) + p.sizeMetric(p.refLUT.Size()) + p.waitReq.MarkAvailable(request) } // LimitPoolSize drops the txs with the lowest price if the total number of requests is too big -func (p *OffLedgerPool) LimitPoolSize() { - // TODO apply a similar limit to on-ledger/time pool (it cannot be unbound) +func (p *OffLedgerPool) LimitPoolSize() []*OrderedPoolEntry { if len(p.orderedByGasPrice) <= p.maxPoolSize { - return // nothing to do + return nil // nothing to do } totalToDelete := len(p.orderedByGasPrice) - p.maxPoolSize @@ -162,6 +167,7 @@ func (p *OffLedgerPool) LimitPoolSize() { p.log.Debugf("LimitPoolSize dropping request: %v", r.req.ID()) p.Remove(r.req) } + return reqsToDelete } func (p *OffLedgerPool) GasPrice(e *OrderedPoolEntry) *big.Int { diff --git a/packages/chain/mempool/time_pool.go b/packages/chain/mempool/time_pool.go index 48f4291823..72331fbc08 100644 --- a/packages/chain/mempool/time_pool.go +++ b/packages/chain/mempool/time_pool.go @@ -12,8 +12,6 @@ import ( "github.com/iotaledger/wasp/packages/isc" ) -// TODO limit this (sort by timelock) - // Maintains a pool of requests that have to be postponed until specified timestamp. type TimePool interface { AddRequest(timestamp time.Time, request isc.OnLedgerRequest) @@ -26,10 +24,11 @@ type TimePool interface { // The list is organized in slots. Each slot contains a list of requests that fit to the // slot boundaries. type timePoolImpl struct { - requests *shrinkingmap.ShrinkingMap[isc.RequestRefKey, isc.OnLedgerRequest] // All the requests in this pool. - slots *timeSlot // Structure to fetch them fast by their time. - sizeMetric func(int) - log *logger.Logger + requests *shrinkingmap.ShrinkingMap[isc.RequestRefKey, isc.OnLedgerRequest] // All the requests in this pool. + slots *timeSlot // Structure to fetch them fast by their time. + maxPoolSize int + sizeMetric func(int) + log *logger.Logger } type timeSlot struct { @@ -43,12 +42,13 @@ const slotPrecision = time.Minute var _ TimePool = &timePoolImpl{} -func NewTimePool(sizeMetric func(int), log *logger.Logger) TimePool { +func NewTimePool(maxTimedInPool int, sizeMetric func(int), log *logger.Logger) TimePool { return &timePoolImpl{ - requests: shrinkingmap.New[isc.RequestRefKey, isc.OnLedgerRequest](), - slots: nil, - sizeMetric: sizeMetric, - log: log, + requests: shrinkingmap.New[isc.RequestRefKey, isc.OnLedgerRequest](), + slots: nil, + maxPoolSize: maxTimedInPool, + sizeMetric: sizeMetric, + log: log, } } @@ -59,10 +59,9 @@ func (tpi *timePoolImpl) AddRequest(timestamp time.Time, request isc.OnLedgerReq return } - if tpi.requests.Set(reqRefKey, request) { - tpi.log.Debugf("ADD %v as key=%v", request.ID(), reqRefKey) + if !tpi.requests.Set(reqRefKey, request) { + return } - tpi.sizeMetric(tpi.requests.Size()) reqFrom, reqTill := tpi.timestampSlotBounds(timestamp) prevNext := &tpi.slots @@ -78,16 +77,51 @@ func (tpi *timePoolImpl) AddRequest(timestamp time.Time, request isc.OnLedgerReq next: slot, } *prevNext = newSlot - return + break } if slot.from == reqFrom { // Add to existing slot. requests, _ := slot.reqs.GetOrCreate(timestamp, func() []isc.OnLedgerRequest { return make([]isc.OnLedgerRequest, 0, 1) }) slot.reqs.Set(timestamp, append(requests, request)) - return + break } prevNext = &slot.next slot = slot.next } + + // + // keep the size of this pool limited + if tpi.requests.Size() > tpi.maxPoolSize { + // remove the slot most far out in the future + var prev *timeSlot + lastSlot := tpi.slots + for { + if lastSlot.next == nil { + break + } + prev = lastSlot + lastSlot = lastSlot.next + } + + // remove the link to the lastSlot + if prev == nil { + tpi.slots = nil + } else { + prev.next = nil + } + + // delete the requests included in the last slot + reqsToDelete := lastSlot.reqs.Values() + for _, reqs := range reqsToDelete { + for _, req := range reqs { + rKey := isc.RequestRefFromRequest(req).AsKey() + tpi.requests.Delete(rKey) + } + } + } + + // log and update metrics + tpi.log.Debugf("ADD %v as key=%v", request.ID(), reqRefKey) + tpi.sizeMetric(tpi.requests.Size()) } func (tpi *timePoolImpl) TakeTill(timestamp time.Time) []isc.OnLedgerRequest { @@ -124,7 +158,6 @@ func (tpi *timePoolImpl) Has(reqRef *isc.RequestRef) bool { } func (tpi *timePoolImpl) Cleanup(predicate func(request isc.OnLedgerRequest, ts time.Time) bool) { - // TODO iterate using an order (gas price) prevNext := &tpi.slots for slot := tpi.slots; slot != nil; slot = slot.next { slot.reqs.ForEach(func(ts time.Time, tsReqs []isc.OnLedgerRequest) bool { diff --git a/packages/chain/mempool/time_pool_test.go b/packages/chain/mempool/time_pool_test.go index 8a88753fda..ecd55cf4f7 100644 --- a/packages/chain/mempool/time_pool_test.go +++ b/packages/chain/mempool/time_pool_test.go @@ -20,7 +20,7 @@ import ( func TestTimePoolBasic(t *testing.T) { log := testlogger.NewLogger(t) - tp := mempool.NewTimePool(func(i int) {}, log) + tp := mempool.NewTimePool(1000, func(i int) {}, log) t0 := time.Now() t1 := t0.Add(17 * time.Nanosecond) t2 := t0.Add(17 * time.Minute) @@ -98,7 +98,7 @@ var _ rapid.StateMachine = &timePoolSM{} func newtimePoolSM(t *rapid.T) *timePoolSM { sm := new(timePoolSM) log := testlogger.NewLogger(t) - sm.tp = mempool.NewTimePool(func(i int) {}, log) + sm.tp = mempool.NewTimePool(1000, func(i int) {}, log) sm.kp = cryptolib.NewKeyPair() sm.added = 0 sm.taken = 0 @@ -122,3 +122,35 @@ func (sm *timePoolSM) TakeTill(t *rapid.T) { res := sm.tp.TakeTill(ts) sm.taken += len(res) } + +func TestTimePoolLimit(t *testing.T) { + log := testlogger.NewLogger(t) + size := 0 + tp := mempool.NewTimePool(3, func(newSize int) { size = newSize }, log) + t0 := time.Now().Add(4 * time.Hour) + t1 := time.Now().Add(3 * time.Hour) + t2 := time.Now().Add(2 * time.Hour) + t3 := time.Now().Add(1 * time.Hour) + + r0, err := isc.OnLedgerFromUTXO(&iotago.BasicOutput{}, tpkg.RandOutputID(0)) + require.NoError(t, err) + r1, err := isc.OnLedgerFromUTXO(&iotago.BasicOutput{}, tpkg.RandOutputID(1)) + require.NoError(t, err) + r2, err := isc.OnLedgerFromUTXO(&iotago.BasicOutput{}, tpkg.RandOutputID(2)) + require.NoError(t, err) + r3, err := isc.OnLedgerFromUTXO(&iotago.BasicOutput{}, tpkg.RandOutputID(3)) + require.NoError(t, err) + + tp.AddRequest(t0, r0) + tp.AddRequest(t1, r1) + tp.AddRequest(t2, r2) + tp.AddRequest(t3, r3) + + require.Equal(t, 3, size) + + // assert t0 was removed (the request scheduled to the latest time in the future) + require.False(t, tp.Has(isc.RequestRefFromRequest(r0))) + require.True(t, tp.Has(isc.RequestRefFromRequest(r1))) + require.True(t, tp.Has(isc.RequestRefFromRequest(r2))) + require.True(t, tp.Has(isc.RequestRefFromRequest(r3))) +} diff --git a/packages/chain/mempool/typed_pool.go b/packages/chain/mempool/typed_pool.go index 51b6967747..b9214f814b 100644 --- a/packages/chain/mempool/typed_pool.go +++ b/packages/chain/mempool/typed_pool.go @@ -6,40 +6,47 @@ package mempool import ( "fmt" "io" + "slices" "time" + "github.com/samber/lo" + "github.com/iotaledger/hive.go/ds/shrinkingmap" "github.com/iotaledger/hive.go/logger" + consGR "github.com/iotaledger/wasp/packages/chain/cons/cons_gr" "github.com/iotaledger/wasp/packages/isc" "github.com/iotaledger/wasp/packages/kv/codec" ) -// TODO limit this list. -// TODO add gas to on-ledger requests +// TODO add gas price to on-ledger requests // TODO this list needs to be periodically re-filled from L1 once the activity is lower - type typedPool[V isc.Request] struct { - waitReq WaitReq - requests *shrinkingmap.ShrinkingMap[isc.RequestRefKey, *typedPoolEntry[V]] - sizeMetric func(int) - timeMetric func(time.Duration) - log *logger.Logger + waitReq WaitReq + requests *shrinkingmap.ShrinkingMap[isc.RequestRefKey, *typedPoolEntry[V]] + ordered []*typedPoolEntry[V] // TODO use a better data structure instead!!! (probably RedBlackTree) + maxPoolSize int + sizeMetric func(int) + timeMetric func(time.Duration) + log *logger.Logger } type typedPoolEntry[V isc.Request] struct { - req V - ts time.Time + req V + proposedFor []consGR.ConsensusID + ts time.Time } var _ RequestPool[isc.OffLedgerRequest] = &typedPool[isc.OffLedgerRequest]{} -func NewTypedPool[V isc.Request](waitReq WaitReq, sizeMetric func(int), timeMetric func(time.Duration), log *logger.Logger) RequestPool[V] { +func NewTypedPool[V isc.Request](maxOnledgerInPool int, waitReq WaitReq, sizeMetric func(int), timeMetric func(time.Duration), log *logger.Logger) RequestPool[V] { return &typedPool[V]{ - waitReq: waitReq, - requests: shrinkingmap.New[isc.RequestRefKey, *typedPoolEntry[V]](), - sizeMetric: sizeMetric, - timeMetric: timeMetric, - log: log, + waitReq: waitReq, + requests: shrinkingmap.New[isc.RequestRefKey, *typedPoolEntry[V]](), + ordered: []*typedPoolEntry[V]{}, + maxPoolSize: maxOnledgerInPool, + sizeMetric: sizeMetric, + timeMetric: timeMetric, + log: log, } } @@ -57,43 +64,128 @@ func (olp *typedPool[V]) Get(reqRef *isc.RequestRef) V { func (olp *typedPool[V]) Add(request V) { refKey := isc.RequestRefFromRequest(request).AsKey() - if olp.requests.Set(refKey, &typedPoolEntry[V]{req: request, ts: time.Now()}) { - olp.log.Debugf("ADD %v as key=%v", request.ID(), refKey) - olp.sizeMetric(olp.requests.Size()) + entry := &typedPoolEntry[V]{ + req: request, + ts: time.Now(), + proposedFor: []consGR.ConsensusID{}, + } + if !olp.requests.Set(refKey, entry) { + return // already in pool } + + // + // add the to the ordered list of requests + { + index, _ := slices.BinarySearchFunc(olp.ordered, entry, olp.reqSort) + olp.ordered = append(olp.ordered, entry) + // make room if target position is not at the end + if index != len(olp.ordered)-1 { + copy(olp.ordered[index+1:], olp.ordered[index:]) + olp.ordered[index] = entry + } + } + + // keep the pool size in check + deleted := olp.LimitPoolSize() + if lo.Contains(deleted, entry) { + // this exact request was deleted from the pool, do not update metrics, or mark available + return + } + + // + // update metrics and signal that the request is available + olp.log.Debugf("ADD %v as key=%v", request.ID(), refKey) + olp.sizeMetric(olp.requests.Size()) olp.waitReq.MarkAvailable(request) } +// LimitPoolSize drops the txs with the lowest price if the total number of requests is too big +func (olp *typedPool[V]) LimitPoolSize() []*typedPoolEntry[V] { + if len(olp.ordered) <= olp.maxPoolSize { + return nil // nothing to do + } + + totalToDelete := len(olp.ordered) - olp.maxPoolSize + reqsToDelete := make([]*typedPoolEntry[V], totalToDelete) + j := 0 + for i := 0; i < len(olp.ordered); i++ { + if len(olp.ordered[i].proposedFor) > 0 { + continue // we cannot drop requests that are being used in current consensus instances + } + reqsToDelete[j] = olp.ordered[i] + if j >= totalToDelete-1 { + break + } + } + + for _, r := range reqsToDelete { + olp.log.Debugf("LimitPoolSize dropping request: %v", r.req.ID()) + olp.Remove(r.req) + } + return reqsToDelete +} + +func (olp *typedPool[V]) reqSort(a, b *typedPoolEntry[V]) int { + // TODO use gas price to sort here, once on-ledger requests have a gas price field + // use requestID as a fallback in case of matching gas price (it's random and should give roughly the same order between nodes) + aID := a.req.ID() + bID := b.req.ID() + for i := range aID { + if aID[i] == bID[i] { + continue + } + if aID[i] > bID[i] { + return 1 + } + return -1 + } + return 0 +} + func (olp *typedPool[V]) Remove(request V) { refKey := isc.RequestRefFromRequest(request).AsKey() - if entry, ok := olp.requests.Get(refKey); ok { - if olp.requests.Delete(refKey) { - olp.log.Debugf("DEL %v as key=%v", request.ID(), refKey) - } - olp.sizeMetric(olp.requests.Size()) - olp.timeMetric(time.Since(entry.ts)) + entry, ok := olp.requests.Get(refKey) + if !ok { + return + } + if !olp.requests.Delete(refKey) { + return + } + + // + // find and delete the request from the ordered list + { + indexToDel := slices.IndexFunc(olp.ordered, func(e *typedPoolEntry[V]) bool { + return refKey == isc.RequestRefFromRequest(e.req).AsKey() + }) + olp.ordered[indexToDel] = nil // remove the pointer reference to allow GC of the entry object + olp.ordered = slices.Delete(olp.ordered, indexToDel, indexToDel+1) } + + // log and update metrics + olp.log.Debugf("DEL %v as key=%v", request.ID(), refKey) + olp.sizeMetric(olp.requests.Size()) + olp.timeMetric(time.Since(entry.ts)) } func (olp *typedPool[V]) Cleanup(predicate func(request V, ts time.Time) bool) { - // TODO iterate using an order (gas price) olp.requests.ForEach(func(refKey isc.RequestRefKey, entry *typedPoolEntry[V]) bool { if !predicate(entry.req, entry.ts) { - if olp.requests.Delete(refKey) { - olp.log.Debugf("DEL %v as key=%v", entry.req.ID(), refKey) - olp.timeMetric(time.Since(entry.ts)) - } + olp.Remove(entry.req) } return true }) olp.sizeMetric(olp.requests.Size()) } -func (olp *typedPool[V]) Iterate(f func(e *typedPoolEntry[V])) { - olp.requests.ForEach(func(refKey isc.RequestRefKey, entry *typedPoolEntry[V]) bool { - f(entry) - return true - }) +func (olp *typedPool[V]) Iterate(f func(e *typedPoolEntry[V]) bool) { + orderedCopy := slices.Clone(olp.ordered) + for _, entry := range orderedCopy { + if !f(entry) { + break + } + } + olp.sizeMetric(olp.requests.Size()) } diff --git a/packages/chain/mempool/typed_pool_test.go b/packages/chain/mempool/typed_pool_test.go new file mode 100644 index 0000000000..a12a1ae8f2 --- /dev/null +++ b/packages/chain/mempool/typed_pool_test.go @@ -0,0 +1,39 @@ +// Copyright 2020 IOTA Stiftung +// SPDX-License-Identifier: Apache-2.0 + +package mempool + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + iotago "github.com/iotaledger/iota.go/v3" + "github.com/iotaledger/iota.go/v3/tpkg" + "github.com/iotaledger/wasp/packages/isc" + "github.com/iotaledger/wasp/packages/testutil/testlogger" +) + +func TestTypedMempoolPoolLimit(t *testing.T) { + waitReq := NewWaitReq(waitRequestCleanupEvery) + poolSizeLimit := 3 + size := 0 + pool := NewTypedPool[isc.OnLedgerRequest](poolSizeLimit, waitReq, func(newSize int) { size = newSize }, func(time.Duration) {}, testlogger.NewSilentLogger("", true)) + + r0, err := isc.OnLedgerFromUTXO(&iotago.BasicOutput{}, tpkg.RandOutputID(0)) + require.NoError(t, err) + r1, err := isc.OnLedgerFromUTXO(&iotago.BasicOutput{}, tpkg.RandOutputID(1)) + require.NoError(t, err) + r2, err := isc.OnLedgerFromUTXO(&iotago.BasicOutput{}, tpkg.RandOutputID(2)) + require.NoError(t, err) + r3, err := isc.OnLedgerFromUTXO(&iotago.BasicOutput{}, tpkg.RandOutputID(3)) + require.NoError(t, err) + + pool.Add(r0) + pool.Add(r1) + pool.Add(r2) + pool.Add(r3) + + require.Equal(t, 3, size) +} diff --git a/packages/evm/jsonrpc/jsonrpctest/env.go b/packages/evm/jsonrpc/jsonrpctest/env.go index 17d6111dd7..ca4a97e4a7 100644 --- a/packages/evm/jsonrpc/jsonrpctest/env.go +++ b/packages/evm/jsonrpc/jsonrpctest/env.go @@ -378,6 +378,19 @@ func (e *Env) TestRPCGasLimitTooLow() { func (e *Env) TestGasPrice() { gasPrice := e.MustGetGasPrice() require.NotZero(e.T, gasPrice.Uint64()) + + // assert sending txs with lower than set gas is not allowed + from, _ := e.NewAccountWithL2Funds() + tx, err := types.SignTx( + types.NewTransaction(0, common.Address{}, big.NewInt(123), math.MaxUint64, new(big.Int).Sub(gasPrice, big.NewInt(1)), nil), + e.Signer(), + from, + ) + require.NoError(e.T, err) + + _, err = e.SendTransactionAndWait(tx) + require.Error(e.T, err) + require.Regexp(e.T, `insufficient gas price: got \d+, minimum is \d+`, err.Error()) } func (e *Env) TestRPCAccessHistoricalState() { diff --git a/packages/vm/core/governance/stateaccess.go b/packages/vm/core/governance/stateaccess.go index 78488c99a5..43cea3e763 100644 --- a/packages/vm/core/governance/stateaccess.go +++ b/packages/vm/core/governance/stateaccess.go @@ -11,6 +11,7 @@ import ( "github.com/iotaledger/wasp/packages/kv" "github.com/iotaledger/wasp/packages/kv/codec" "github.com/iotaledger/wasp/packages/kv/subrealm" + "github.com/iotaledger/wasp/packages/parameters" ) type StateAccess struct { @@ -69,6 +70,5 @@ func (sa *StateAccess) GetBlockKeepAmount() int32 { } func (sa *StateAccess) DefaultGasPrice() *big.Int { - // TODO return the equivalent price defined by the feepolicy - panic("TODO implement DefaultGasPrice") + return MustGetGasFeePolicy(sa.state).DefaultGasPriceFullDecimals(parameters.L1().BaseToken.Decimals) } From 3311201ab2409ccdda2ca7bf8c85ae3989694b28 Mon Sep 17 00:00:00 2001 From: Jorge Silva Date: Tue, 23 Apr 2024 11:05:00 +0100 Subject: [PATCH 5/7] refresh on-ledger requests after congestion makes mempool drop them --- components/chains/component.go | 13 +++--- components/chains/params.go | 37 ++++++++--------- packages/chain/mempool/mempool.go | 40 +++++++++--------- packages/chain/mempool/mempool_test.go | 2 + packages/chain/mempool/time_pool.go | 36 ++++++++++++----- packages/chain/mempool/typed_pool.go | 56 +++++++++++++++++++------- packages/chain/node.go | 4 ++ packages/chain/node_test.go | 7 ++++ packages/nodeconn/nc_chain.go | 31 ++++++++------ packages/nodeconn/nodeconn.go | 9 +++++ 10 files changed, 156 insertions(+), 79 deletions(-) diff --git a/components/chains/component.go b/components/chains/component.go index 46905bf975..2eae037055 100644 --- a/components/chains/component.go +++ b/components/chains/component.go @@ -129,12 +129,13 @@ func provide(c *dig.Container) error { deps.ConsensusStateRegistry, deps.ChainListener, mempool.Settings{ - TTL: ParamsChains.MempoolTTL, - MaxOffledgerInPool: ParamsChains.MempoolMaxOffledgerInPool, - MaxOnledgerInPool: ParamsChains.MempoolMaxOnledgerInPool, - MaxTimedInPool: ParamsChains.MempoolMaxTimedInPool, - MaxOnledgerToPropose: ParamsChains.MempoolMaxOnledgerToPropose, - MaxOffledgerToPropose: ParamsChains.MempoolMaxOffledgerToPropose, + TTL: ParamsChains.MempoolTTL, + OnLedgerRefreshMinInterval: ParamsChains.MempoolOnLedgerRefreshMinInterval, + MaxOffledgerInPool: ParamsChains.MempoolMaxOffledgerInPool, + MaxOnledgerInPool: ParamsChains.MempoolMaxOnledgerInPool, + MaxTimedInPool: ParamsChains.MempoolMaxTimedInPool, + MaxOnledgerToPropose: ParamsChains.MempoolMaxOnledgerToPropose, + MaxOffledgerToPropose: ParamsChains.MempoolMaxOffledgerToPropose, }, ParamsChains.BroadcastInterval, shutdown.NewCoordinator("chains", Component.Logger().Named("Shutdown")), diff --git a/components/chains/params.go b/components/chains/params.go index 697e52f574..59496ef459 100644 --- a/components/chains/params.go +++ b/components/chains/params.go @@ -7,24 +7,25 @@ import ( ) type ParametersChains struct { - BroadcastUpToNPeers int `default:"2" usage:"number of peers an offledger request is broadcasted to"` - BroadcastInterval time.Duration `default:"0s" usage:"time between re-broadcast of offledger requests; 0 value means that re-broadcasting is disabled"` - APICacheTTL time.Duration `default:"300s" usage:"time to keep processed offledger requests in api cache"` - PullMissingRequestsFromCommittee bool `default:"true" usage:"whether or not to pull missing requests from other committee members"` - DeriveAliasOutputByQuorum bool `default:"true" usage:"false means we propose own AliasOutput, true - by majority vote."` - PipeliningLimit int `default:"-1" usage:"-1 -- infinite, 0 -- disabled, X -- build the chain if there is up to X transactions unconfirmed by L1."` - ConsensusDelay time.Duration `default:"500ms" usage:"Minimal delay between consensus runs."` - RecoveryTimeout time.Duration `default:"20s" usage:"Time after which another consensus attempt is made."` - RedeliveryPeriod time.Duration `default:"2s" usage:"the resend period for msg."` - PrintStatusPeriod time.Duration `default:"3s" usage:"the period to print consensus instance status."` - ConsensusInstsInAdvance int `default:"3" usage:""` - AwaitReceiptCleanupEvery int `default:"100" usage:"for every this number AwaitReceipt will be cleaned up"` - MempoolTTL time.Duration `default:"24h" usage:"Time that requests are allowed to sit in the mempool without being processed"` - MempoolMaxOffledgerInPool int `default:"2000" usage:"Maximum number of off-ledger requests kept in the mempool"` - MempoolMaxOnledgerInPool int `default:"1000" usage:"Maximum number of on-ledger requests kept in the mempool"` - MempoolMaxTimedInPool int `default:"100" usage:"Maximum number of timed on-ledger requests kept in the mempool"` - MempoolMaxOffledgerToPropose int `default:"500" usage:"Maximum number of off-ledger requests to propose for the next block"` - MempoolMaxOnledgerToPropose int `default:"100" usage:"Maximum number of on-ledger requests to propose for the next block (includes timed requests)"` + BroadcastUpToNPeers int `default:"2" usage:"number of peers an offledger request is broadcasted to"` + BroadcastInterval time.Duration `default:"0s" usage:"time between re-broadcast of offledger requests; 0 value means that re-broadcasting is disabled"` + APICacheTTL time.Duration `default:"300s" usage:"time to keep processed offledger requests in api cache"` + PullMissingRequestsFromCommittee bool `default:"true" usage:"whether or not to pull missing requests from other committee members"` + DeriveAliasOutputByQuorum bool `default:"true" usage:"false means we propose own AliasOutput, true - by majority vote."` + PipeliningLimit int `default:"-1" usage:"-1 -- infinite, 0 -- disabled, X -- build the chain if there is up to X transactions unconfirmed by L1."` + ConsensusDelay time.Duration `default:"500ms" usage:"Minimal delay between consensus runs."` + RecoveryTimeout time.Duration `default:"20s" usage:"Time after which another consensus attempt is made."` + RedeliveryPeriod time.Duration `default:"2s" usage:"the resend period for msg."` + PrintStatusPeriod time.Duration `default:"3s" usage:"the period to print consensus instance status."` + ConsensusInstsInAdvance int `default:"3" usage:""` + AwaitReceiptCleanupEvery int `default:"100" usage:"for every this number AwaitReceipt will be cleaned up"` + MempoolTTL time.Duration `default:"24h" usage:"Time that requests are allowed to sit in the mempool without being processed"` + MempoolMaxOffledgerInPool int `default:"2000" usage:"Maximum number of off-ledger requests kept in the mempool"` + MempoolMaxOnledgerInPool int `default:"1000" usage:"Maximum number of on-ledger requests kept in the mempool"` + MempoolMaxTimedInPool int `default:"100" usage:"Maximum number of timed on-ledger requests kept in the mempool"` + MempoolMaxOffledgerToPropose int `default:"500" usage:"Maximum number of off-ledger requests to propose for the next block"` + MempoolMaxOnledgerToPropose int `default:"100" usage:"Maximum number of on-ledger requests to propose for the next block (includes timed requests)"` + MempoolOnLedgerRefreshMinInterval time.Duration `default:"10m" usage:"Minimum interval to try to refresh the list of on-ledger requests after some have been dropped from the pool (this interval is introduced to avoid dropping/refreshing cycle if there are too many requests on L1 to process)"` } type ParametersWAL struct { diff --git a/packages/chain/mempool/mempool.go b/packages/chain/mempool/mempool.go index 3ec5004e40..53d684e477 100644 --- a/packages/chain/mempool/mempool.go +++ b/packages/chain/mempool/mempool.go @@ -115,24 +115,13 @@ type Mempool interface { } type Settings struct { - TTL time.Duration // time to live (how much time requests are allowed to sit in the pool without being processed) - MaxOffledgerInPool int - MaxOnledgerInPool int - MaxTimedInPool int - MaxOnledgerToPropose int // (including timed-requests) - MaxOffledgerToPropose int -} - -type RequestPool[V isc.Request] interface { - Has(reqRef *isc.RequestRef) bool - Get(reqRef *isc.RequestRef) V - Add(request V) - Remove(request V) - // this removes requests from the pool if predicate returns false - Cleanup(predicate func(request V, ts time.Time) bool) - Iterate(f func(e *typedPoolEntry[V]) bool) - StatusString() string - WriteContent(io.Writer) + TTL time.Duration // time to live (how much time requests are allowed to sit in the pool without being processed) + OnLedgerRefreshMinInterval time.Duration + MaxOffledgerInPool int + MaxOnledgerInPool int + MaxTimedInPool int + MaxOnledgerToPropose int // (including timed-requests) + MaxOffledgerToPropose int } // This implementation tracks single branch of the chain only. I.e. all the consensus @@ -175,6 +164,8 @@ type mempoolImpl struct { log *logger.Logger metrics *metrics.ChainMempoolMetrics listener ChainListener + refreshOnLedgerRequests func() + lastRefreshTimestamp time.Time } var _ Mempool = &mempoolImpl{} @@ -235,6 +226,7 @@ func New( listener ChainListener, settings Settings, broadcastInterval time.Duration, + refreshOnLedgerRequests func(), ) Mempool { netPeeringID := peering.HashPeeringIDFromBytes(chainID.Bytes(), []byte("Mempool")) // ChainID × Mempool waitReq := NewWaitReq(waitRequestCleanupEvery) @@ -269,6 +261,8 @@ func New( log: log, metrics: metrics, listener: listener, + refreshOnLedgerRequests: refreshOnLedgerRequests, + lastRefreshTimestamp: time.Now(), } pipeMetrics.TrackPipeLen("mp-serverNodesUpdatedPipe", mpi.serverNodesUpdatedPipe.Len) @@ -392,7 +386,7 @@ func (mpi *mempoolImpl) run(ctx context.Context, cleanupFunc context.CancelFunc) debugTicker := time.NewTicker(distShareDebugTick) timeTicker := time.NewTicker(distShareTimeTick) rePublishTicker := time.NewTicker(distShareRePublishTick) - forceCleanMempoolTicker := time.NewTicker(forceCleanMempoolTick) + forceCleanMempoolTicker := time.NewTicker(forceCleanMempoolTick) // this exists to force mempool cleanup on access nodes // thought: maybe access nodes shouldn't have a mempool at all for { select { case recv, ok := <-serverNodesUpdatedPipeOutCh: @@ -950,6 +944,14 @@ func (mpi *mempoolImpl) handleRePublishTimeTick() { } return true }) + + // periodically try to refresh On-ledger requests that might have been dropped + if time.Since(mpi.lastRefreshTimestamp) > mpi.settings.OnLedgerRefreshMinInterval { + if mpi.onLedgerPool.ShouldRefreshRequests() || mpi.timePool.ShouldRefreshRequests() { + mpi.refreshOnLedgerRequests() + mpi.lastRefreshTimestamp = time.Now() + } + } } func (mpi *mempoolImpl) handleForceCleanMempool() { diff --git a/packages/chain/mempool/mempool_test.go b/packages/chain/mempool/mempool_test.go index 3e16fa245c..06bcc10a43 100644 --- a/packages/chain/mempool/mempool_test.go +++ b/packages/chain/mempool/mempool_test.go @@ -688,6 +688,7 @@ func TestTTL(t *testing.T) { MaxOffledgerToPropose: 1000, }, 1*time.Second, + func() {}, ) defer te.close() start := time.Now() @@ -820,6 +821,7 @@ func newEnv(t *testing.T, n, f int, reliable bool) *testEnv { MaxOffledgerToPropose: 1000, }, 1*time.Second, + func() {}, ) } return te diff --git a/packages/chain/mempool/time_pool.go b/packages/chain/mempool/time_pool.go index 72331fbc08..0d277ec307 100644 --- a/packages/chain/mempool/time_pool.go +++ b/packages/chain/mempool/time_pool.go @@ -18,17 +18,19 @@ type TimePool interface { TakeTill(timestamp time.Time) []isc.OnLedgerRequest Has(reqID *isc.RequestRef) bool Cleanup(predicate func(request isc.OnLedgerRequest, ts time.Time) bool) + ShouldRefreshRequests() bool } // Here we implement TimePool. We maintain the request in a list ordered by a timestamp. // The list is organized in slots. Each slot contains a list of requests that fit to the // slot boundaries. type timePoolImpl struct { - requests *shrinkingmap.ShrinkingMap[isc.RequestRefKey, isc.OnLedgerRequest] // All the requests in this pool. - slots *timeSlot // Structure to fetch them fast by their time. - maxPoolSize int - sizeMetric func(int) - log *logger.Logger + requests *shrinkingmap.ShrinkingMap[isc.RequestRefKey, isc.OnLedgerRequest] // All the requests in this pool. + slots *timeSlot // Structure to fetch them fast by their time. + hasDroppedRequests bool + maxPoolSize int + sizeMetric func(int) + log *logger.Logger } type timeSlot struct { @@ -44,11 +46,12 @@ var _ TimePool = &timePoolImpl{} func NewTimePool(maxTimedInPool int, sizeMetric func(int), log *logger.Logger) TimePool { return &timePoolImpl{ - requests: shrinkingmap.New[isc.RequestRefKey, isc.OnLedgerRequest](), - slots: nil, - maxPoolSize: maxTimedInPool, - sizeMetric: sizeMetric, - log: log, + requests: shrinkingmap.New[isc.RequestRefKey, isc.OnLedgerRequest](), + slots: nil, + hasDroppedRequests: false, + maxPoolSize: maxTimedInPool, + sizeMetric: sizeMetric, + log: log, } } @@ -117,6 +120,7 @@ func (tpi *timePoolImpl) AddRequest(timestamp time.Time, request isc.OnLedgerReq tpi.requests.Delete(rKey) } } + tpi.hasDroppedRequests = true } // log and update metrics @@ -124,6 +128,18 @@ func (tpi *timePoolImpl) AddRequest(timestamp time.Time, request isc.OnLedgerReq tpi.sizeMetric(tpi.requests.Size()) } +func (tpi *timePoolImpl) ShouldRefreshRequests() bool { + if !tpi.hasDroppedRequests { + return false + } + if tpi.requests.Size() > 0 { + return false // wait until pool is empty to refresh + } + // assume after this function returns true, the requests will be refreshed + tpi.hasDroppedRequests = false + return true +} + func (tpi *timePoolImpl) TakeTill(timestamp time.Time) []isc.OnLedgerRequest { resp := []isc.OnLedgerRequest{} for slot := tpi.slots; slot != nil; slot = slot.next { diff --git a/packages/chain/mempool/typed_pool.go b/packages/chain/mempool/typed_pool.go index b9214f814b..3dc2a3186d 100644 --- a/packages/chain/mempool/typed_pool.go +++ b/packages/chain/mempool/typed_pool.go @@ -18,16 +18,30 @@ import ( "github.com/iotaledger/wasp/packages/kv/codec" ) +type RequestPool[V isc.Request] interface { + Has(reqRef *isc.RequestRef) bool + Get(reqRef *isc.RequestRef) V + Add(request V) + Remove(request V) + // this removes requests from the pool if predicate returns false + Cleanup(predicate func(request V, ts time.Time) bool) + Iterate(f func(e *typedPoolEntry[V]) bool) + StatusString() string + WriteContent(io.Writer) + ShouldRefreshRequests() bool +} + // TODO add gas price to on-ledger requests // TODO this list needs to be periodically re-filled from L1 once the activity is lower type typedPool[V isc.Request] struct { - waitReq WaitReq - requests *shrinkingmap.ShrinkingMap[isc.RequestRefKey, *typedPoolEntry[V]] - ordered []*typedPoolEntry[V] // TODO use a better data structure instead!!! (probably RedBlackTree) - maxPoolSize int - sizeMetric func(int) - timeMetric func(time.Duration) - log *logger.Logger + waitReq WaitReq + requests *shrinkingmap.ShrinkingMap[isc.RequestRefKey, *typedPoolEntry[V]] + ordered []*typedPoolEntry[V] // TODO use a better data structure instead!!! (probably RedBlackTree) + hasDroppedRequests bool + maxPoolSize int + sizeMetric func(int) + timeMetric func(time.Duration) + log *logger.Logger } type typedPoolEntry[V isc.Request] struct { @@ -40,13 +54,14 @@ var _ RequestPool[isc.OffLedgerRequest] = &typedPool[isc.OffLedgerRequest]{} func NewTypedPool[V isc.Request](maxOnledgerInPool int, waitReq WaitReq, sizeMetric func(int), timeMetric func(time.Duration), log *logger.Logger) RequestPool[V] { return &typedPool[V]{ - waitReq: waitReq, - requests: shrinkingmap.New[isc.RequestRefKey, *typedPoolEntry[V]](), - ordered: []*typedPoolEntry[V]{}, - maxPoolSize: maxOnledgerInPool, - sizeMetric: sizeMetric, - timeMetric: timeMetric, - log: log, + waitReq: waitReq, + requests: shrinkingmap.New[isc.RequestRefKey, *typedPoolEntry[V]](), + ordered: []*typedPoolEntry[V]{}, + hasDroppedRequests: false, + maxPoolSize: maxOnledgerInPool, + sizeMetric: sizeMetric, + timeMetric: timeMetric, + log: log, } } @@ -122,6 +137,7 @@ func (olp *typedPool[V]) LimitPoolSize() []*typedPoolEntry[V] { olp.log.Debugf("LimitPoolSize dropping request: %v", r.req.ID()) olp.Remove(r.req) } + olp.hasDroppedRequests = true return reqsToDelete } @@ -168,6 +184,18 @@ func (olp *typedPool[V]) Remove(request V) { olp.timeMetric(time.Since(entry.ts)) } +func (olp *typedPool[V]) ShouldRefreshRequests() bool { + if !olp.hasDroppedRequests { + return false + } + if olp.requests.Size() > 0 { + return false // wait until pool is empty to refresh + } + // assume after this function returns true, the requests will be refreshed + olp.hasDroppedRequests = false + return true +} + func (olp *typedPool[V]) Cleanup(predicate func(request V, ts time.Time) bool) { olp.requests.ForEach(func(refKey isc.RequestRefKey, entry *typedPoolEntry[V]) bool { if !predicate(entry.req, entry.ts) { diff --git a/packages/chain/node.go b/packages/chain/node.go index 5527225184..47ba2826f4 100644 --- a/packages/chain/node.go +++ b/packages/chain/node.go @@ -141,6 +141,8 @@ type ChainNodeConn interface { onChainConnect func(), onChainDisconnect func(), ) + // called if the mempoll has dropped some requests during congestion, and now the congestion stopped + RefreshOnLedgerRequests(ctx context.Context, chainID isc.ChainID) } type chainNodeImpl struct { @@ -433,7 +435,9 @@ func New( cni.listener, mempoolSettings, mempoolBroadcastInterval, + func() { nodeConn.RefreshOnLedgerRequests(ctx, chainID) }, ) + cni.chainMgr = gpa.NewAckHandler(cni.me, chainMgr.AsGPA(), RedeliveryPeriod) cni.stateMgr = stateMgr cni.mempool = mempool diff --git a/packages/chain/node_test.go b/packages/chain/node_test.go index 31b2c3f695..bb5f06d56b 100644 --- a/packages/chain/node_test.go +++ b/packages/chain/node_test.go @@ -282,6 +282,8 @@ type testNodeConn struct { attachWG *sync.WaitGroup } +var _ chain.NodeConnection = &testNodeConn{} + func newTestNodeConn(t *testing.T) *testNodeConn { tnc := &testNodeConn{ t: t, @@ -372,6 +374,11 @@ func (tnc *testNodeConn) GetL1ProtocolParams() *iotago.ProtocolParameters { return testparameters.GetL1ProtocolParamsForTesting() } +// RefreshOnLedgerRequests implements chain.NodeConnection. +func (tnc *testNodeConn) RefreshOnLedgerRequests(ctx context.Context, chainID isc.ChainID) { + // noop +} + //////////////////////////////////////////////////////////////////////////////// // testEnv diff --git a/packages/nodeconn/nc_chain.go b/packages/nodeconn/nc_chain.go index 1783e8cebe..7f080aedf4 100644 --- a/packages/nodeconn/nc_chain.go +++ b/packages/nodeconn/nc_chain.go @@ -56,7 +56,7 @@ type ncChain struct { ctx context.Context nodeConn *nodeConnection chainID isc.ChainID - requestOutputHandler func(iotago.MilestoneIndex, *isc.OutputInfo) + requestOutputHandler func(*isc.OutputInfo) aliasOutputHandler func(iotago.MilestoneIndex, *isc.OutputInfo) milestoneHandler func(iotago.MilestoneIndex, time.Time) @@ -100,8 +100,8 @@ func newNCChain( lastPendingTx: nil, } - chain.requestOutputHandler = func(milestoneIndex iotago.MilestoneIndex, outputInfo *isc.OutputInfo) { - chain.LogDebugf("applying request output: outputID: %s, milestoneIndex: %d, chainID: %s", outputInfo.OutputID.ToHex(), milestoneIndex, chainID) + chain.requestOutputHandler = func(outputInfo *isc.OutputInfo) { + chain.LogDebugf("applying request output: outputID: %s, , chainID: %s", outputInfo.OutputID.ToHex(), chainID) requestOutputHandler(outputInfo) } @@ -168,7 +168,7 @@ func (ncc *ncChain) applyPendingLedgerUpdates(ledgerIndex iotago.MilestoneIndex) switch update.Type { case pendingLedgerUpdateTypeRequest: - ncc.requestOutputHandler(update.LedgerIndex, update.Update.(*isc.OutputInfo)) + ncc.requestOutputHandler(update.Update.(*isc.OutputInfo)) case pendingLedgerUpdateTypeAlias: ncc.aliasOutputHandler(update.LedgerIndex, update.Update.(*isc.OutputInfo)) case pendingLedgerUpdateTypeMilestone: @@ -198,7 +198,7 @@ func (ncc *ncChain) HandleRequestOutput(ledgerIndex iotago.MilestoneIndex, outpu return } - ncc.requestOutputHandler(ledgerIndex, outputInfo) + ncc.requestOutputHandler(outputInfo) } func (ncc *ncChain) HandleAliasOutput(ledgerIndex iotago.MilestoneIndex, outputInfo *isc.OutputInfo) { @@ -669,6 +669,19 @@ func (ncc *ncChain) SyncChainStateWithL1(ctx context.Context) error { ncc.milestoneHandler(ledgerIndex, milestoneTimestamp) ncc.aliasOutputHandler(ledgerIndex, aliasOutput) + if err := ncc.refreshOwnedOutputs(ctx); err != nil { + return err + } + + if err := ncc.applyPendingLedgerUpdates(ledgerIndex); err != nil { + return err + } + + ncc.LogInfof("Synchronizing chain state and owned outputs for %s... done. (LedgerIndex: %d)", ncc.chainID, ledgerIndex) + return nil +} + +func (ncc *ncChain) refreshOwnedOutputs(ctx context.Context) error { // the indexer returns the outputs in sorted order by timestampBooked, // so we don't miss newly added outputs if the ledgerIndex increases during the query. // HINT: requests might be applied twice, if they are part of a pendingLedgerUpdate that overlaps with @@ -686,14 +699,8 @@ func (ncc *ncChain) SyncChainStateWithL1(ctx context.Context) error { } ncc.LogDebugf("received output, chainID: %s, outputID: %s", ncc.chainID, outputID.ToHex()) - ncc.requestOutputHandler(ledgerIndex, isc.NewOutputInfo(outputID, output, iotago.TransactionID{})) - } - - if err := ncc.applyPendingLedgerUpdates(ledgerIndex); err != nil { - return err + ncc.requestOutputHandler(isc.NewOutputInfo(outputID, output, iotago.TransactionID{})) } - - ncc.LogInfof("Synchronizing chain state and owned outputs for %s... done. (LedgerIndex: %d)", ncc.chainID, ledgerIndex) return nil } diff --git a/packages/nodeconn/nodeconn.go b/packages/nodeconn/nodeconn.go index 73fdee5258..72dcd486b3 100644 --- a/packages/nodeconn/nodeconn.go +++ b/packages/nodeconn/nodeconn.go @@ -81,6 +81,15 @@ type nodeConnection struct { shutdownHandler *shutdown.ShutdownHandler } +// RefreshOnLedgerRequests implements chain.NodeConnection. +func (nc *nodeConnection) RefreshOnLedgerRequests(ctx context.Context, chainID isc.ChainID) { + ncChain, ok := nc.chainsMap.Get(chainID) + if !ok { + panic("unexpected chainID") + } + ncChain.refreshOwnedOutputs(ctx) +} + func New( ctx context.Context, log *logger.Logger, From a5adb2805602ed08dc4110d64a08af873927d531 Mon Sep 17 00:00:00 2001 From: Jorge Silva Date: Tue, 23 Apr 2024 11:42:12 +0100 Subject: [PATCH 6/7] lint fix --- .golangci.yml | 2 +- packages/evm/solidity/abi.go | 2 +- packages/nodeconn/nodeconn.go | 4 ++- packages/trie/test/proof_test.go | 2 +- packages/trie/test/trie_test.go | 6 ++--- packages/trie/trie_kvstore.go | 2 +- packages/util/pipe/queue_test.go | 27 +++++++++---------- .../governance/governanceimpl/chaininfo.go | 2 +- .../controllers/chain/waitforrequest.go | 2 +- tools/schema/model/yaml/convert.go | 20 +++++++------- 10 files changed, 34 insertions(+), 35 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index 119a19445c..f4bf8be97a 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -145,7 +145,6 @@ linters: - errcheck # Errcheck is a program for checking for unchecked errors in go programs. These unchecked errors can be critical bugs in some cases - exportloopref # Checks for pointers to enclosing loop variables. - funlen # Tool for detection of long functions. - - goconst # Finds repeated strings that could be replaced by a constant. - gocritic # Provides many diagnostics that check for bugs, performance and style issues. - gocyclo # Computes and checks the cyclomatic complexity of functions. - goerr113 # Golang linter to check the errors handling expressions. @@ -186,6 +185,7 @@ linters: - wastedassign # wastedassign finds wasted assignment statements. [fast: true, auto-fix: false] + # - goconst # Finds repeated strings that could be replaced by a constant. # - depguard # Go linter that checks if package imports are in a list of acceptable packages [fast: true, auto-fix: false] # nlreturn # nlreturn checks for a new line before return and branch statements to increase code clarity [fast: true, auto-fix: false] # don't enable: diff --git a/packages/evm/solidity/abi.go b/packages/evm/solidity/abi.go index 2d0074f5b8..7d13b6e1b2 100644 --- a/packages/evm/solidity/abi.go +++ b/packages/evm/solidity/abi.go @@ -42,7 +42,7 @@ func StorageEncodeString(slotNumber uint8, s string) (ret map[common.Hash]common ret[mainSlot] = common.BigToHash(big.NewInt(int64(len(s)*2) + 1)) i := 0 - for len(s) > 0 { + for s != "" { var chunk common.Hash copy(chunk[:], s) diff --git a/packages/nodeconn/nodeconn.go b/packages/nodeconn/nodeconn.go index 72dcd486b3..84cb957348 100644 --- a/packages/nodeconn/nodeconn.go +++ b/packages/nodeconn/nodeconn.go @@ -87,7 +87,9 @@ func (nc *nodeConnection) RefreshOnLedgerRequests(ctx context.Context, chainID i if !ok { panic("unexpected chainID") } - ncChain.refreshOwnedOutputs(ctx) + if err := ncChain.refreshOwnedOutputs(ctx); err != nil { + nc.LogError(fmt.Sprintf("error refreshing outputs: %s", err.Error())) + } } func New( diff --git a/packages/trie/test/proof_test.go b/packages/trie/test/proof_test.go index dc8846b87e..e903817271 100644 --- a/packages/trie/test/proof_test.go +++ b/packages/trie/test/proof_test.go @@ -30,7 +30,7 @@ func TestProofScenariosBlake2b(t *testing.T) { p := trr.MerkleProof([]byte(k)) err = p.Validate(root.Bytes()) require.NoError(t, err) - if len(v) > 0 { + if v != "" { cID := trie.CommitToData([]byte(v)) err = p.ValidateWithTerminal(root.Bytes(), cID.Bytes()) require.NoError(t, err) diff --git a/packages/trie/test/trie_test.go b/packages/trie/test/trie_test.go index a45db0c03d..115600e8f0 100644 --- a/packages/trie/test/trie_test.go +++ b/packages/trie/test/trie_test.go @@ -299,7 +299,7 @@ func runUpdateScenario(trieUpdatable *trie.TrieUpdatable, store trie.KVStore, sc continue // key must not be empty } key = []byte(before) - if len(after) > 0 { + if after != "" { value = []byte(after) } } else { @@ -333,11 +333,11 @@ func checkResult(t *testing.T, trie *trie.TrieUpdatable, checklist map[string]st for key, expectedValue := range checklist { v := trie.GetStr(key) if traceScenarios { - if len(v) > 0 { + if v != "" { fmt.Printf("FOUND '%s': '%s' (expected '%s')\n", key, v, expectedValue) } else { fmt.Printf("NOT FOUND '%s' (expected '%s')\n", key, func() string { - if len(expectedValue) > 0 { + if expectedValue != "" { return "FOUND" } return "NOT FOUND" diff --git a/packages/trie/trie_kvstore.go b/packages/trie/trie_kvstore.go index 8add223dbc..23a7f80ff3 100644 --- a/packages/trie/trie_kvstore.go +++ b/packages/trie/trie_kvstore.go @@ -202,7 +202,7 @@ func (tr *TrieUpdatable) delete(triePath []byte) { nodes[i-1].removeChild(nil, idxAsChild) } } - assertf(nodes[0] != nil, "please do not delete root") //nolint:gosec // false positive + assertf(nodes[0] != nil, "please do not delete root") } func (tr *TrieUpdatable) mergeNodeIfNeeded(node *bufferedNode) *bufferedNode { diff --git a/packages/util/pipe/queue_test.go b/packages/util/pipe/queue_test.go index 98884d76ac..45863a0ad9 100644 --- a/packages/util/pipe/queue_test.go +++ b/packages/util/pipe/queue_test.go @@ -170,12 +170,11 @@ func TestLimitPriorityLimitedPriorityHashQueueTwice(t *testing.T) { return 3*index/2 - 20 } return (3*index - 41) / 2 - } else { - if index%2 == 1 { - return (3*index - 139) / 2 - } - return 3*index/2 - 70 } + if index%2 == 1 { + return (3*index - 139) / 2 + } + return 3*index/2 - 70 } testQueueTwice(NewSimpleNothashableFactory(), q, elementsToAddSingle, alwaysTrueFun, limit, resultFun, t) } @@ -262,12 +261,11 @@ func testPriorityQueueTwice[E IntConvertible](factory Factory[E], makePriorityQu return 3*index/2 - 50 } return (3*index - 101) / 2 - } else { - if index%2 == 1 { - return (3*index - 199) / 2 - } - return 3*index/2 - 100 } + if index%2 == 1 { + return (3*index - 199) / 2 + } + return 3*index/2 - 100 } testQueueTwice(factory, q, elementsToAddSingle, alwaysTrueFun, 2*elementsToAddSingle, resultFun, t) } @@ -354,12 +352,11 @@ func TestLimitPriorityHashLimitedPriorityHashQueueDuplicates(t *testing.T) { return 3*index - 40 } return 3*index - 41 - } else { - if index%2 == 0 { - return 3*index - 139 - } - return 3*index - 140 } + if index%2 == 0 { + return 3*index - 139 + } + return 3*index - 140 } testQueueBasicAddLengthPeekRemove(NewSimpleHashableFactory(), q, 3*elementsToAddFirstIteration, addFun, addResultFun, limit, resultFun, t) } diff --git a/packages/vm/core/governance/governanceimpl/chaininfo.go b/packages/vm/core/governance/governanceimpl/chaininfo.go index 91416cf704..5215d26ec4 100644 --- a/packages/vm/core/governance/governanceimpl/chaininfo.go +++ b/packages/vm/core/governance/governanceimpl/chaininfo.go @@ -19,7 +19,7 @@ func getChainInfo(ctx isc.SandboxView) dict.Dict { ret.Set(governance.VarGasFeePolicyBytes, info.GasFeePolicy.Bytes()) ret.Set(governance.VarGasLimitsBytes, info.GasLimits.Bytes()) - if len(info.PublicURL) > 0 { + if info.PublicURL != "" { ret.Set(governance.VarPublicURL, codec.EncodeString(info.PublicURL)) } diff --git a/packages/webapi/controllers/chain/waitforrequest.go b/packages/webapi/controllers/chain/waitforrequest.go index ef34da6332..783bdf07a9 100644 --- a/packages/webapi/controllers/chain/waitforrequest.go +++ b/packages/webapi/controllers/chain/waitforrequest.go @@ -32,7 +32,7 @@ func (c *Controller) waitForRequestToFinish(e echo.Context) error { timeout := defaultTimeoutSeconds * time.Second timeoutInSeconds := e.QueryParam("timeoutSeconds") - if len(timeoutInSeconds) > 0 { + if timeoutInSeconds != "" { parsedTimeout, _ := strconv.Atoi(timeoutInSeconds) if err != nil { diff --git a/tools/schema/model/yaml/convert.go b/tools/schema/model/yaml/convert.go index 8c310abb21..0632ed4eb9 100644 --- a/tools/schema/model/yaml/convert.go +++ b/tools/schema/model/yaml/convert.go @@ -85,10 +85,10 @@ func (n *Node) toStringElt() model.DefElt { func (n *Node) toDefElt() *model.DefElt { comment := "" - if len(n.HeadComment) > 0 { + if n.HeadComment != "" { // remove trailing '\n' and space comment = strings.TrimSpace(n.HeadComment) - } else if len(n.LineComment) > 0 { + } else if n.LineComment != "" { // remove trailing '\n' and space comment = strings.TrimSpace(n.LineComment) } @@ -126,9 +126,9 @@ func (n *Node) toDefMapMap() model.DefMapMap { continue } comment := "" - if len(yamlKey.HeadComment) > 0 { + if yamlKey.HeadComment != "" { comment = strings.TrimSpace(yamlKey.HeadComment) // remove trailing '\n' - } else if len(yamlKey.LineComment) > 0 { + } else if yamlKey.LineComment != "" { comment = strings.TrimSpace(yamlKey.LineComment) // remove trailing '\n' } @@ -146,9 +146,9 @@ func (n *Node) toDefMapMap() model.DefMapMap { func (n *Node) toFuncDef() model.FuncDef { def := model.FuncDef{} def.Line = n.Line - if len(n.HeadComment) > 0 { + if n.HeadComment != "" { def.Comment = strings.TrimSpace(n.HeadComment) // remove trailing '\n' - } else if len(n.LineComment) > 0 { + } else if n.LineComment != "" { def.Comment = strings.TrimSpace(n.LineComment) // remove trailing '\n' } @@ -164,9 +164,9 @@ func (n *Node) toFuncDef() model.FuncDef { return model.FuncDef{} } def.Access = *yamlKey.Contents[0].toDefElt() - if len(yamlKey.HeadComment) > 0 { + if yamlKey.HeadComment != "" { def.Access.Comment = strings.TrimSpace(yamlKey.HeadComment) // remove trailing '\n' - } else if len(yamlKey.LineComment) > 0 { + } else if yamlKey.LineComment != "" { def.Access.Comment = strings.TrimSpace(yamlKey.LineComment) // remove trailing '\n' } case KeyParams: @@ -188,9 +188,9 @@ func (n *Node) toFuncDefMap() model.FuncDefMap { continue } comment := "" - if len(yamlKey.HeadComment) > 0 { + if yamlKey.HeadComment != "" { comment = strings.TrimSpace(yamlKey.HeadComment) // remove trailing '\n' - } else if len(yamlKey.LineComment) > 0 { + } else if yamlKey.LineComment != "" { comment = strings.TrimSpace(yamlKey.LineComment) // remove trailing '\n' } key := model.DefElt{ From 3aac8b217e1df27705c509e28c88c080ed69c2b9 Mon Sep 17 00:00:00 2001 From: Jorge Silva Date: Tue, 23 Apr 2024 12:17:42 +0100 Subject: [PATCH 7/7] lint fix --- packages/testutil/run_heavy_tests_false.go | 1 + packages/trie/trie_kvstore.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/testutil/run_heavy_tests_false.go b/packages/testutil/run_heavy_tests_false.go index e5680b6c8f..7f8f874219 100644 --- a/packages/testutil/run_heavy_tests_false.go +++ b/packages/testutil/run_heavy_tests_false.go @@ -5,6 +5,7 @@ package testutil import "testing" +//nolint:gocritic // its not a test function, but gets called by other test functions func RunHeavy(t *testing.T) { t.Logf("skipping heavy test %s", t.Name()) t.SkipNow() diff --git a/packages/trie/trie_kvstore.go b/packages/trie/trie_kvstore.go index 23a7f80ff3..8add223dbc 100644 --- a/packages/trie/trie_kvstore.go +++ b/packages/trie/trie_kvstore.go @@ -202,7 +202,7 @@ func (tr *TrieUpdatable) delete(triePath []byte) { nodes[i-1].removeChild(nil, idxAsChild) } } - assertf(nodes[0] != nil, "please do not delete root") + assertf(nodes[0] != nil, "please do not delete root") //nolint:gosec // false positive } func (tr *TrieUpdatable) mergeNodeIfNeeded(node *bufferedNode) *bufferedNode {