From e3cbd01f63209b24e8e024e20cb4c17d37f26855 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Mon, 5 Dec 2022 14:15:21 -0800 Subject: [PATCH] Cache headers and logs using accepted depth instead of LRU (#387) * save progress * save progress * saving more progress * add more context * more comments * fix items * fix removal * passs config * add additional comment * move cache init * remove TODO * fix logs format * nits * insert into cache oldest first * make bounded buffer generic * cleanup FIFO cache * cleanup comments * simplify bounded buffer * fix `Last` * nits * more comment nits * fix min size of FIFOCache * add NoOpFIFOCache * move `FlattenLogs` to `types` * fix off by 1 * revert LastBloomIndex * fifo cache nits * simplify bounded buffer * move acceptedHeaderCache to headerchain * remove unnecessary comment --- accounts/abi/bind/backends/simulated.go | 1 - core/blockchain.go | 82 ++++++++++++++++++++----- core/blockchain_reader.go | 9 +++ core/bounded_buffer.go | 56 ++++++++++------- core/fifo_cache.go | 72 ++++++++++++++++++++++ core/headerchain.go | 28 +++++---- core/state_manager.go | 8 +-- core/types/log.go | 9 +++ eth/api_backend.go | 10 ++- eth/backend.go | 6 +- eth/ethconfig/config.go | 32 +++++----- eth/filters/filter.go | 38 +++++------- eth/filters/filter_system.go | 52 +++------------- eth/filters/filter_system_test.go | 5 -- plugin/evm/config.go | 16 ++--- plugin/evm/vm.go | 3 +- 16 files changed, 263 insertions(+), 164 deletions(-) create mode 100644 core/fifo_cache.go diff --git a/accounts/abi/bind/backends/simulated.go b/accounts/abi/bind/backends/simulated.go index 16aff3f063..68c544eaf6 100644 --- a/accounts/abi/bind/backends/simulated.go +++ b/accounts/abi/bind/backends/simulated.go @@ -945,7 +945,6 @@ func (fb *filterBackend) SubscribePendingLogsEvent(ch chan<- []*types.Log) event } func (fb *filterBackend) BloomStatus() (uint64, uint64) { return 4096, 0 } -func (fb *filterBackend) LastBloomIndex() uint64 { return 0 } func (fb *filterBackend) ServiceFilter(ctx context.Context, ms *bloombits.MatcherSession) { panic("not supported") diff --git a/core/blockchain.go b/core/blockchain.go index ada9fa3a26..8f20865fc1 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -164,6 +164,7 @@ type CacheConfig struct { SnapshotVerify bool // Verify generated snapshots SkipSnapshotRebuild bool // Whether to skip rebuilding the snapshot in favor of returning an error (only set to true for tests) Preimages bool // Whether to store preimage of trie key to the disk + AcceptedCacheSize int // Depth of accepted headers cache and accepted logs cache at the accepted tip } var DefaultCacheConfig = &CacheConfig{ @@ -174,6 +175,7 @@ var DefaultCacheConfig = &CacheConfig{ CommitInterval: 4096, AcceptorQueueLimit: 64, // Provides 2 minutes of buffer (2s block target) for a commit delay SnapshotLimit: 256, + AcceptedCacheSize: 32, } // BlockChain represents the canonical chain given a database with a genesis @@ -276,6 +278,9 @@ type BlockChain struct { // [flattenLock] prevents the [acceptor] from flattening snapshots while // a block is being verified. flattenLock sync.Mutex + + // [acceptedLogsCache] stores recently accepted logs to improve the performance of eth_getLogs. + acceptedLogsCache FIFOCache[common.Hash, [][]*types.Log] } // NewBlockChain returns a fully initialised block chain using information @@ -305,24 +310,25 @@ func NewBlockChain( Preimages: cacheConfig.Preimages, StatsPrefix: trieCleanCacheStatsNamespace, }), - bodyCache: bodyCache, - receiptsCache: receiptsCache, - blockCache: blockCache, - txLookupCache: txLookupCache, - feeConfigCache: feeConfigCache, - engine: engine, - vmConfig: vmConfig, - badBlocks: badBlocks, - senderCacher: newTxSenderCacher(runtime.NumCPU()), - acceptorQueue: make(chan *types.Block, cacheConfig.AcceptorQueueLimit), - quit: make(chan struct{}), + bodyCache: bodyCache, + receiptsCache: receiptsCache, + blockCache: blockCache, + txLookupCache: txLookupCache, + feeConfigCache: feeConfigCache, + engine: engine, + vmConfig: vmConfig, + badBlocks: badBlocks, + senderCacher: newTxSenderCacher(runtime.NumCPU()), + acceptorQueue: make(chan *types.Block, cacheConfig.AcceptorQueueLimit), + quit: make(chan struct{}), + acceptedLogsCache: NewFIFOCache[common.Hash, [][]*types.Log](cacheConfig.AcceptedCacheSize), } bc.validator = NewBlockValidator(chainConfig, bc, engine) bc.prefetcher = newStatePrefetcher(chainConfig, bc, engine) bc.processor = NewStateProcessor(chainConfig, bc, engine) var err error - bc.hc, err = NewHeaderChain(db, chainConfig, engine) + bc.hc, err = NewHeaderChain(db, chainConfig, cacheConfig, engine) if err != nil { return nil, err } @@ -373,6 +379,9 @@ func NewBlockChain( bc.initSnapshot(head) } + // Warm up [hc.acceptedNumberCache] and [acceptedLogsCache] + bc.warmAcceptedCaches() + // Start processing accepted blocks effects in the background go bc.startAcceptor() @@ -435,6 +444,41 @@ func (bc *BlockChain) flattenSnapshot(postAbortWork func() error, hash common.Ha return bc.snaps.Flatten(hash) } +// warmAcceptedCaches fetches previously accepted headers and logs from disk to +// pre-populate [hc.acceptedNumberCache] and [acceptedLogsCache]. +func (bc *BlockChain) warmAcceptedCaches() { + var ( + startTime = time.Now() + lastAccepted = bc.LastAcceptedBlock().NumberU64() + startIndex = uint64(1) + targetCacheSize = uint64(bc.cacheConfig.AcceptedCacheSize) + ) + if targetCacheSize == 0 { + log.Info("Not warming accepted cache because disabled") + return + } + if lastAccepted < startIndex { + // This could occur if we haven't accepted any blocks yet + log.Info("Not warming accepted cache because there are no accepted blocks") + return + } + cacheDiff := targetCacheSize - 1 // last accepted lookback is inclusive, so we reduce size by 1 + if cacheDiff < lastAccepted { + startIndex = lastAccepted - cacheDiff + } + for i := startIndex; i <= lastAccepted; i++ { + header := bc.GetHeaderByNumber(i) + if header == nil { + // This could happen if a node state-synced + log.Info("Exiting accepted cache warming early because header is nil", "height", i, "t", time.Since(startTime)) + break + } + bc.hc.acceptedNumberCache.Put(header.Number.Uint64(), header) + bc.acceptedLogsCache.Put(header.Hash(), rawdb.ReadLogs(bc.db, header.Hash(), header.Number.Uint64())) + } + log.Info("Warmed accepted caches", "start", startIndex, "end", lastAccepted, "t", time.Since(startTime)) +} + // startAcceptor starts processing items on the [acceptorQueue]. If a [nil] // object is placed on the [acceptorQueue], the [startAcceptor] will exit. func (bc *BlockChain) startAcceptor() { @@ -455,13 +499,16 @@ func (bc *BlockChain) startAcceptor() { log.Crit("failed to write accepted block effects", "err", err) } - // Fetch block logs - logs := bc.gatherBlockLogs(next.Hash(), next.NumberU64(), false) + // Ensure [hc.acceptedNumberCache] and [acceptedLogsCache] have latest content + bc.hc.acceptedNumberCache.Put(next.NumberU64(), next.Header()) + logs := rawdb.ReadLogs(bc.db, next.Hash(), next.NumberU64()) + bc.acceptedLogsCache.Put(next.Hash(), logs) // Update accepted feeds - bc.chainAcceptedFeed.Send(ChainEvent{Block: next, Hash: next.Hash(), Logs: logs}) - if len(logs) > 0 { - bc.logsAcceptedFeed.Send(logs) + flattenedLogs := types.FlattenLogs(logs) + bc.chainAcceptedFeed.Send(ChainEvent{Block: next, Hash: next.Hash(), Logs: flattenedLogs}) + if len(flattenedLogs) > 0 { + bc.logsAcceptedFeed.Send(flattenedLogs) } if len(next.Transactions()) != 0 { bc.txAcceptedFeed.Send(NewTxsEvent{next.Transactions()}) @@ -911,6 +958,7 @@ func (bc *BlockChain) Accept(block *types.Block) error { } } + // Enqueue block in the acceptor bc.lastAccepted = block bc.addAcceptorQueue(block) acceptedBlockGasUsedCounter.Inc(int64(block.GasUsed())) diff --git a/core/blockchain_reader.go b/core/blockchain_reader.go index 552c738834..130fe0e34d 100644 --- a/core/blockchain_reader.go +++ b/core/blockchain_reader.go @@ -405,3 +405,12 @@ func (bc *BlockChain) GetCoinbaseAt(parent *types.Header) (common.Address, bool, rewardAddress, feeRecipients := precompile.GetStoredRewardAddress(stateDB) return rewardAddress, feeRecipients, nil } + +// GetLogs fetches all logs from a given block. +func (bc *BlockChain) GetLogs(hash common.Hash, number uint64) [][]*types.Log { + logs, ok := bc.acceptedLogsCache.Get(hash) // this cache is thread-safe + if ok { + return logs + } + return rawdb.ReadLogs(bc.db, hash, number) +} diff --git a/core/bounded_buffer.go b/core/bounded_buffer.go index c99042fa3e..b6170682d9 100644 --- a/core/bounded_buffer.go +++ b/core/bounded_buffer.go @@ -3,37 +3,42 @@ package core -import ( - "github.com/ethereum/go-ethereum/common" -) - -// BoundedBuffer keeps [size] common.Hash entries in a buffer and calls -// [callback] on any item that is evicted. This is typically used for +// BoundedBuffer keeps [size] entries of type [K] in a buffer and calls +// [callback] on any item that is overwritten. This is typically used for // dereferencing old roots during block processing. -type BoundedBuffer struct { +// +// BoundedBuffer is not thread-safe and requires the caller synchronize usage. +type BoundedBuffer[K any] struct { lastPos int size int - callback func(common.Hash) - buffer []common.Hash + callback func(K) + buffer []K + + cycled bool } // NewBoundedBuffer creates a new [BoundedBuffer]. -func NewBoundedBuffer(size int, callback func(common.Hash)) *BoundedBuffer { - return &BoundedBuffer{ +func NewBoundedBuffer[K any](size int, callback func(K)) *BoundedBuffer[K] { + return &BoundedBuffer[K]{ + lastPos: -1, size: size, callback: callback, - buffer: make([]common.Hash, size), + buffer: make([]K, size), } } -// Insert adds a new common.Hash to the buffer. If the buffer is full, the -// oldest common.Hash will be evicted and [callback] will be invoked. -// -// WARNING: BoundedBuffer does not support the insertion of empty common.Hash. -// Inserting such data will cause unintended behavior. -func (b *BoundedBuffer) Insert(h common.Hash) { - nextPos := (b.lastPos + 1) % b.size // the first item added to the buffer will be at position 1 - if b.buffer[nextPos] != (common.Hash{}) { +// Insert adds a new value to the buffer. If the buffer is full, the +// oldest value will be overwritten and [callback] will be invoked. +func (b *BoundedBuffer[K]) Insert(h K) { + nextPos := b.lastPos + 1 // the first item added to the buffer will be at position 0 + if nextPos == b.size { + nextPos = 0 + // Set [cycled] since we are back to the 0th element + b.cycled = true + } + if b.cycled { + // We ensure we have cycled through the buffer once before invoking the + // [callback] to ensure we don't call it with unset values. b.callback(b.buffer[nextPos]) } b.buffer[nextPos] = h @@ -41,7 +46,12 @@ func (b *BoundedBuffer) Insert(h common.Hash) { } // Last retrieves the last item added to the buffer. -// If no items have been added to the buffer, Last returns an empty hash. -func (b *BoundedBuffer) Last() common.Hash { - return b.buffer[b.lastPos] +// +// If no items have been added to the buffer, Last returns the default value of +// [K] and [false]. +func (b *BoundedBuffer[K]) Last() (K, bool) { + if b.lastPos == -1 { + return *new(K), false + } + return b.buffer[b.lastPos], true } diff --git a/core/fifo_cache.go b/core/fifo_cache.go new file mode 100644 index 0000000000..ae6feffcce --- /dev/null +++ b/core/fifo_cache.go @@ -0,0 +1,72 @@ +// (c) 2021, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package core + +import ( + "sync" +) + +var ( + _ FIFOCache[int, int] = (*BufferFIFOCache[int, int])(nil) + _ FIFOCache[int, int] = (*NoOpFIFOCache[int, int])(nil) +) + +// FIFOCache evicts the oldest element added to it after [limit] items are +// added. +type FIFOCache[K comparable, V any] interface { + Put(K, V) + Get(K) (V, bool) +} + +// NewFIFOCache creates a new First-In-First-Out cache of size [limit]. +// +// If a [limit] of 0 is passed as an argument, a no-op cache is returned that +// does nothing. +func NewFIFOCache[K comparable, V any](limit int) FIFOCache[K, V] { + if limit <= 0 { + return &NoOpFIFOCache[K, V]{} + } + + c := &BufferFIFOCache[K, V]{ + m: make(map[K]V, limit), + } + c.buffer = NewBoundedBuffer(limit, c.remove) + return c +} + +type BufferFIFOCache[K comparable, V any] struct { + l sync.RWMutex + + buffer *BoundedBuffer[K] + m map[K]V +} + +func (f *BufferFIFOCache[K, V]) Put(key K, val V) { + f.l.Lock() + defer f.l.Unlock() + + f.buffer.Insert(key) // Insert will remove the oldest [K] if we are at the [limit] + f.m[key] = val +} + +func (f *BufferFIFOCache[K, V]) Get(key K) (V, bool) { + f.l.RLock() + defer f.l.RUnlock() + + v, ok := f.m[key] + return v, ok +} + +// remove is used as the callback in [BoundedBuffer]. It is assumed that the +// [WriteLock] is held when this is accessed. +func (f *BufferFIFOCache[K, V]) remove(key K) { + delete(f.m, key) +} + +type NoOpFIFOCache[K comparable, V any] struct{} + +func (f *NoOpFIFOCache[K, V]) Put(_ K, _ V) {} +func (f *NoOpFIFOCache[K, V]) Get(_ K) (V, bool) { + return *new(V), false +} diff --git a/core/headerchain.go b/core/headerchain.go index c5a1a2ab94..9f404db426 100644 --- a/core/headerchain.go +++ b/core/headerchain.go @@ -70,9 +70,10 @@ type HeaderChain struct { currentHeader atomic.Value // Current head of the header chain (may be above the block chain!) currentHeaderHash common.Hash // Hash of the current head of the header chain (prevent recomputing all the time) - headerCache *lru.Cache // Cache for the most recent block headers - tdCache *lru.Cache // Cache for the most recent block total difficulties - numberCache *lru.Cache // Cache for the most recent block numbers + headerCache *lru.Cache // Cache for the most recent block headers + tdCache *lru.Cache // Cache for the most recent block total difficulties + numberCache *lru.Cache // Cache for the most recent block numbers + acceptedNumberCache FIFOCache[uint64, *types.Header] // Cache for most recent accepted heights to headers (only modified in accept) rand *mrand.Rand engine consensus.Engine @@ -80,10 +81,11 @@ type HeaderChain struct { // NewHeaderChain creates a new HeaderChain structure. ProcInterrupt points // to the parent's interrupt semaphore. -func NewHeaderChain(chainDb ethdb.Database, config *params.ChainConfig, engine consensus.Engine) (*HeaderChain, error) { +func NewHeaderChain(chainDb ethdb.Database, config *params.ChainConfig, cacheConfig *CacheConfig, engine consensus.Engine) (*HeaderChain, error) { headerCache, _ := lru.New(headerCacheLimit) tdCache, _ := lru.New(tdCacheLimit) numberCache, _ := lru.New(numberCacheLimit) + acceptedNumberCache := NewFIFOCache[uint64, *types.Header](cacheConfig.AcceptedCacheSize) // Seed a fast but crypto originating random generator seed, err := crand.Int(crand.Reader, big.NewInt(math.MaxInt64)) @@ -92,13 +94,14 @@ func NewHeaderChain(chainDb ethdb.Database, config *params.ChainConfig, engine c } hc := &HeaderChain{ - config: config, - chainDb: chainDb, - headerCache: headerCache, - tdCache: tdCache, - numberCache: numberCache, - rand: mrand.New(mrand.NewSource(seed.Int64())), - engine: engine, + config: config, + chainDb: chainDb, + headerCache: headerCache, + tdCache: tdCache, + numberCache: numberCache, + acceptedNumberCache: acceptedNumberCache, + rand: mrand.New(mrand.NewSource(seed.Int64())), + engine: engine, } hc.genesisHeader = hc.GetHeaderByNumber(0) @@ -170,6 +173,9 @@ func (hc *HeaderChain) HasHeader(hash common.Hash, number uint64) bool { // GetHeaderByNumber retrieves a block header from the database by number, // caching it (associated with its hash) if found. func (hc *HeaderChain) GetHeaderByNumber(number uint64) *types.Header { + if cachedHeader, ok := hc.acceptedNumberCache.Get(number); ok { + return cachedHeader + } hash := rawdb.ReadCanonicalHash(hc.chainDb, number) if hash == (common.Hash{}) { return nil diff --git a/core/state_manager.go b/core/state_manager.go index faed13ed94..069556f486 100644 --- a/core/state_manager.go +++ b/core/state_manager.go @@ -79,7 +79,7 @@ func NewTrieWriter(db TrieDB, config *CacheConfig) TrieWriter { targetCommitSize: common.StorageSize(config.TrieDirtyCommitTarget) * 1024 * 1024, imageCap: 4 * 1024 * 1024, commitInterval: config.CommitInterval, - tipBuffer: NewBoundedBuffer(tipBufferSize, db.Dereference), + tipBuffer: NewBoundedBuffer[common.Hash](tipBufferSize, db.Dereference), } cm.flushStepSize = (cm.memoryCap - cm.targetCommitSize) / common.StorageSize(flushWindow) return cm @@ -121,7 +121,7 @@ type cappedMemoryTrieWriter struct { imageCap common.StorageSize commitInterval uint64 - tipBuffer *BoundedBuffer + tipBuffer *BoundedBuffer[common.Hash] } func (cm *cappedMemoryTrieWriter) InsertTrie(block *types.Block) error { @@ -192,8 +192,8 @@ func (cm *cappedMemoryTrieWriter) RejectTrie(block *types.Block) error { func (cm *cappedMemoryTrieWriter) Shutdown() error { // If [tipBuffer] entry is empty, no need to do any cleanup on // shutdown. - last := cm.tipBuffer.Last() - if last == (common.Hash{}) { + last, exists := cm.tipBuffer.Last() + if !exists { return nil } diff --git a/core/types/log.go b/core/types/log.go index 8c429e9c3a..131ef8599a 100644 --- a/core/types/log.go +++ b/core/types/log.go @@ -148,3 +148,12 @@ func (l *LogForStorage) DecodeRLP(s *rlp.Stream) error { } return err } + +// FlattenLogs converts a nested array of logs to a single array of logs. +func FlattenLogs(list [][]*Log) []*Log { + var flat []*Log + for _, logs := range list { + flat = append(flat, logs...) + } + return flat +} diff --git a/eth/api_backend.go b/eth/api_backend.go index 1c2860a339..14aeda94af 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -252,7 +252,10 @@ func (b *EthAPIBackend) GetReceipts(ctx context.Context, hash common.Hash) (type } func (b *EthAPIBackend) GetLogs(ctx context.Context, hash common.Hash, number uint64) ([][]*types.Log, error) { - return rawdb.ReadLogs(b.eth.chainDb, hash, number), nil + if err := ctx.Err(); err != nil { + return nil, err + } + return b.eth.blockchain.GetLogs(hash, number), nil } func (b *EthAPIBackend) GetEVM(ctx context.Context, msg core.Message, state *state.StateDB, header *types.Header, vmConfig *vm.Config) (*vm.EVM, func() error, error) { @@ -429,11 +432,6 @@ func (b *EthAPIBackend) BloomStatus() (uint64, uint64) { return params.BloomBitsBlocks, sections } -func (b *EthAPIBackend) LastBloomIndex() uint64 { - size, sections := b.BloomStatus() - return size * sections -} - func (b *EthAPIBackend) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) { for i := 0; i < bloomFilterThreads; i++ { go session.Multiplex(bloomRetrievalBatch, bloomRetrievalWait, b.eth.bloomRequests) diff --git a/eth/backend.go b/eth/backend.go index a704939b33..7e9aaebd84 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -214,6 +214,7 @@ func New( SnapshotVerify: config.SnapshotVerify, SkipSnapshotRebuild: config.SkipSnapshotRebuild, Preimages: config.Preimages, + AcceptedCacheSize: config.AcceptedCacheSize, } ) @@ -281,11 +282,8 @@ func (s *Ethereum) APIs() []rpc.API { apis = append(apis, s.stackRPCs...) // Create [filterSystem] with the log cache size set in the config. - ethcfg := s.APIBackend.eth.config filterSystem := filters.NewFilterSystem(s.APIBackend, filters.Config{ - IndexedLogCacheSize: ethcfg.IndexedFilterLogCacheSize, - UnindexedLogCacheSize: ethcfg.UnindexedFilterLogCacheSize, - Timeout: 5 * time.Minute, + Timeout: 5 * time.Minute, }) // Append all the local APIs and return diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index dc0f16e8ed..c1be8c0ead 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -52,19 +52,18 @@ var DefaultConfig = NewDefaultConfig() func NewDefaultConfig() Config { return Config{ - NetworkId: 1, - TrieCleanCache: 512, - TrieDirtyCache: 256, - TrieDirtyCommitTarget: 20, - SnapshotCache: 256, - IndexedFilterLogCacheSize: 32, - UnindexedFilterLogCacheSize: 32, - Miner: miner.Config{}, - TxPool: core.DefaultTxPoolConfig, - RPCGasCap: 25000000, - RPCEVMTimeout: 5 * time.Second, - GPO: DefaultFullGPOConfig, - RPCTxFeeCap: 1, + NetworkId: 1, + TrieCleanCache: 512, + TrieDirtyCache: 256, + TrieDirtyCommitTarget: 20, + SnapshotCache: 256, + AcceptedCacheSize: 32, + Miner: miner.Config{}, + TxPool: core.DefaultTxPoolConfig, + RPCGasCap: 25000000, + RPCEVMTimeout: 5 * time.Second, + GPO: DefaultFullGPOConfig, + RPCTxFeeCap: 1, } } @@ -102,10 +101,9 @@ type Config struct { SnapshotCache int Preimages bool - // This is the number of indexed blocks for which logs will be cached in the filter system. - IndexedFilterLogCacheSize int - // This is the number of unindexed blocks for which logs will be cached in the filter system. - UnindexedFilterLogCacheSize int + // AcceptedCacheSize is the depth of accepted headers cache and accepted + // logs cache at the accepted tip. + AcceptedCacheSize int // Mining options Miner miner.Config diff --git a/eth/filters/filter.go b/eth/filters/filter.go index 629730ffda..65869a5f4d 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -128,7 +128,7 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) { if header == nil { return nil, errors.New("unknown block") } - return f.blockLogs(ctx, header, false, f.sys.backend.LastBloomIndex() > header.Number.Uint64()) + return f.blockLogs(ctx, header, false) } // Short-cut if all we care about is pending logs if f.begin == rpc.PendingBlockNumber.Int64() { @@ -176,14 +176,14 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) { } // Gather all indexed logs, and finish with non indexed ones var ( - logs []*types.Log - lastIndexed = f.sys.backend.LastBloomIndex() + logs []*types.Log + size, sections = f.sys.backend.BloomStatus() ) - if lastIndexed > uint64(f.begin) { - if lastIndexed > end { + if indexed := sections * size; indexed > uint64(f.begin) { + if indexed > end { logs, err = f.indexedLogs(ctx, end) } else { - logs, err = f.indexedLogs(ctx, lastIndexed-1) + logs, err = f.indexedLogs(ctx, indexed-1) } if err != nil { return logs, err @@ -229,7 +229,7 @@ func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, err if header == nil || err != nil { return logs, err } - found, err := f.blockLogs(ctx, header, true, true) + found, err := f.blockLogs(ctx, header, true) if err != nil { return logs, err } @@ -251,7 +251,7 @@ func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, e if header == nil || err != nil { return logs, err } - found, err := f.blockLogs(ctx, header, false, false) + found, err := f.blockLogs(ctx, header, false) if err != nil { return logs, err } @@ -261,29 +261,29 @@ func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, e } // blockLogs returns the logs matching the filter criteria within a single block. -func (f *Filter) blockLogs(ctx context.Context, header *types.Header, skipBloom bool, indexed bool) ([]*types.Log, error) { +func (f *Filter) blockLogs(ctx context.Context, header *types.Header, skipBloom bool) ([]*types.Log, error) { // Fast track: no filtering criteria if len(f.addresses) == 0 && len(f.topics) == 0 { - list, err := f.sys.cachedGetLogs(ctx, header.Hash(), header.Number.Uint64(), indexed) + list, err := f.sys.getLogs(ctx, header.Hash(), header.Number.Uint64()) if err != nil { return nil, err } - return flatten(list), nil + return types.FlattenLogs(list), nil } else if skipBloom || bloomFilter(header.Bloom, f.addresses, f.topics) { - return f.checkMatches(ctx, header, indexed) + return f.checkMatches(ctx, header) } return nil, nil } // checkMatches checks if the receipts belonging to the given header contain any log events that // match the filter criteria. This function is called when the bloom filter signals a potential match. -func (f *Filter) checkMatches(ctx context.Context, header *types.Header, indexed bool) ([]*types.Log, error) { - logsList, err := f.sys.cachedGetLogs(ctx, header.Hash(), header.Number.Uint64(), indexed) +func (f *Filter) checkMatches(ctx context.Context, header *types.Header) ([]*types.Log, error) { + logsList, err := f.sys.getLogs(ctx, header.Hash(), header.Number.Uint64()) if err != nil { return nil, err } - unfiltered := flatten(logsList) + unfiltered := types.FlattenLogs(logsList) logs := filterLogs(unfiltered, nil, nil, f.addresses, f.topics) if len(logs) > 0 { // We have matching logs, check if we need to resolve full logs via the light client @@ -377,11 +377,3 @@ func bloomFilter(bloom types.Bloom, addresses []common.Address, topics [][]commo } return true } - -func flatten(list [][]*types.Log) []*types.Log { - var flat []*types.Log - for _, logs := range list { - flat = append(flat, logs...) - } - return flat -} diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index 5d30a04e3b..f5ec9fc66c 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -45,26 +45,17 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" - lru "github.com/hashicorp/golang-lru" ) // Config represents the configuration of the filter system. type Config struct { - IndexedLogCacheSize int // maximum number of indexed cached blocks (default: 32) - UnindexedLogCacheSize int // maximum number of unindexed cached blocks (default: 32) - Timeout time.Duration // how long filters stay active (default: 5min) + Timeout time.Duration // how long filters stay active (default: 5min) } func (cfg Config) withDefaults() Config { if cfg.Timeout == 0 { cfg.Timeout = 5 * time.Minute } - if cfg.IndexedLogCacheSize == 0 { - cfg.IndexedLogCacheSize = 32 - } - if cfg.UnindexedLogCacheSize == 0 { - cfg.UnindexedLogCacheSize = 32 - } return cfg } @@ -87,7 +78,6 @@ type Backend interface { SubscribeAcceptedTransactionEvent(ch chan<- core.NewTxsEvent) event.Subscription BloomStatus() (uint64, uint64) - LastBloomIndex() uint64 ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) // Added to the backend interface to support limiting of logs requests @@ -98,43 +88,22 @@ type Backend interface { // FilterSystem holds resources shared by all filters. type FilterSystem struct { - backend Backend - indexedLogsCache *lru.Cache - unindexedLogsCache *lru.Cache - cfg *Config + backend Backend + cfg *Config } // NewFilterSystem creates a filter system. func NewFilterSystem(backend Backend, config Config) *FilterSystem { config = config.withDefaults() - - indexedCache, err := lru.New(config.IndexedLogCacheSize) - if err != nil { - panic(err) - } - unindexedCache, err := lru.New(config.UnindexedLogCacheSize) - if err != nil { - panic(err) - } return &FilterSystem{ - backend: backend, - indexedLogsCache: indexedCache, - unindexedLogsCache: unindexedCache, - cfg: &config, + backend: backend, + cfg: &config, } } -// cachedGetLogs loads block logs from the backend and caches the result. -func (sys *FilterSystem) cachedGetLogs(ctx context.Context, blockHash common.Hash, number uint64, indexed bool) ([][]*types.Log, error) { - cache := sys.indexedLogsCache - if !indexed { - cache = sys.unindexedLogsCache - } - cached, ok := cache.Get(blockHash) - if ok { - return cached.([][]*types.Log), nil - } - +// getLogs loads block logs from the backend. The backend is responsible for +// performing any log caching. +func (sys *FilterSystem) getLogs(ctx context.Context, blockHash common.Hash, number uint64) ([][]*types.Log, error) { logs, err := sys.backend.GetLogs(ctx, blockHash, number) if err != nil { return nil, err @@ -142,7 +111,6 @@ func (sys *FilterSystem) cachedGetLogs(ctx context.Context, blockHash common.Has if logs == nil { return nil, fmt.Errorf("failed to get logs for block #%d (0x%s)", number, blockHash.TerminalString()) } - cache.Add(blockHash, logs) return logs, nil } @@ -641,9 +609,7 @@ func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common. // Get the logs of the block ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() - indexed := es.sys.backend.LastBloomIndex() - headerNumber := header.Number.Uint64() - logsList, err := es.sys.cachedGetLogs(ctx, header.Hash(), headerNumber, indexed > headerNumber) + logsList, err := es.sys.getLogs(ctx, header.Hash(), header.Number.Uint64()) if err != nil { return nil } diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index e1a7374e97..74dca1fbca 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -153,11 +153,6 @@ func (b *testBackend) BloomStatus() (uint64, uint64) { return params.BloomBitsBlocks, b.sections } -func (b *testBackend) LastBloomIndex() uint64 { - size, sections := b.BloomStatus() - return size * sections -} - func (b *testBackend) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) { requests := make(chan chan *bloombits.Retrieval) diff --git a/plugin/evm/config.go b/plugin/evm/config.go index a57bef3105..a738312388 100644 --- a/plugin/evm/config.go +++ b/plugin/evm/config.go @@ -44,8 +44,7 @@ const ( defaultMaxOutboundActiveRequests = 16 defaultPopulateMissingTriesParallelism = 1024 defaultStateSyncServerTrieCache = 64 // MB - defaultIndexedFilterLogCacheSize = 32 - defaultUnindexedFilterLogCacheSize = 32 // There can be up to [params.BloomBitsBlocks = 4096] unindexed blocks + defaultAcceptedCacheSize = 32 // blocks // defaultStateSyncMinBlocks is the minimum number of blocks the blockchain // should be ahead of local last accepted to perform state sync. @@ -177,10 +176,12 @@ type Config struct { // identical state with the pre-upgrade ruleset. SkipUpgradeCheck bool `json:"skip-upgrade-check"` - // IndexedLogCacheSize is the number of indexed blocks for which logs will be cached in the filter system. - IndexedLogCacheSize int `json:"indexed-log-cache-size"` - // UnindexedLogCacheSize is the number of unindexed blocks for which logs will be cached in the filter system. - UnindexedLogCacheSize int `json:"unindexed-log-cache-size"` + // AcceptedCacheSize is the depth to keep in the accepted headers cache and the + // accepted logs cache at the accepted tip. + // + // This is particularly useful for improving the performance of eth_getLogs + // on RPC nodes. + AcceptedCacheSize int `json:"accepted-cache-size"` } // EthAPIs returns an array of strings representing the Eth APIs that should be enabled @@ -226,8 +227,7 @@ func (c *Config) SetDefaults() { c.StateSyncCommitInterval = defaultSyncableCommitInterval c.StateSyncMinBlocks = defaultStateSyncMinBlocks c.AllowUnprotectedTxHashes = defaultAllowUnprotectedTxHashes - c.IndexedLogCacheSize = defaultIndexedFilterLogCacheSize - c.UnindexedLogCacheSize = defaultUnindexedFilterLogCacheSize + c.AcceptedCacheSize = defaultAcceptedCacheSize } func (d *Duration) UnmarshalJSON(data []byte) (err error) { diff --git a/plugin/evm/vm.go b/plugin/evm/vm.go index bfbe12b3bf..bb7b0100d9 100644 --- a/plugin/evm/vm.go +++ b/plugin/evm/vm.go @@ -336,8 +336,7 @@ func (vm *VM) Initialize( vm.ethConfig.OfflinePruningDataDirectory = vm.config.OfflinePruningDataDirectory vm.ethConfig.CommitInterval = vm.config.CommitInterval vm.ethConfig.SkipUpgradeCheck = vm.config.SkipUpgradeCheck - vm.ethConfig.IndexedFilterLogCacheSize = vm.config.IndexedLogCacheSize - vm.ethConfig.UnindexedFilterLogCacheSize = vm.config.UnindexedLogCacheSize + vm.ethConfig.AcceptedCacheSize = vm.config.AcceptedCacheSize // Create directory for offline pruning if len(vm.ethConfig.OfflinePruningDataDirectory) != 0 {