Skip to content

Commit

Permalink
Improved eth_getLogs caching (#386)
Browse files Browse the repository at this point in the history
* change filter system

* pipe through indexing

* pipe config

* add more logs

* fix incorrect usage

* fix indexed usage

* fix light client logs

* add LastBloomIndex

* fix tests

* nits
  • Loading branch information
patrick-ogrady authored Dec 1, 2022
1 parent 2a9569a commit a54eae2
Show file tree
Hide file tree
Showing 9 changed files with 87 additions and 44 deletions.
1 change: 1 addition & 0 deletions accounts/abi/bind/backends/simulated.go
Original file line number Diff line number Diff line change
Expand Up @@ -945,6 +945,7 @@ func (fb *filterBackend) SubscribePendingLogsEvent(ch chan<- []*types.Log) event
}

func (fb *filterBackend) BloomStatus() (uint64, uint64) { return 4096, 0 }
func (fb *filterBackend) LastBloomIndex() uint64 { return 0 }

func (fb *filterBackend) ServiceFilter(ctx context.Context, ms *bloombits.MatcherSession) {
panic("not supported")
Expand Down
5 changes: 5 additions & 0 deletions eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,11 @@ func (b *EthAPIBackend) BloomStatus() (uint64, uint64) {
return params.BloomBitsBlocks, sections
}

func (b *EthAPIBackend) LastBloomIndex() uint64 {
size, sections := b.BloomStatus()
return size * sections
}

func (b *EthAPIBackend) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) {
for i := 0; i < bloomFilterThreads; i++ {
go session.Multiplex(bloomRetrievalBatch, bloomRetrievalWait, b.eth.bloomRequests)
Expand Down
5 changes: 3 additions & 2 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,9 @@ func (s *Ethereum) APIs() []rpc.API {
// Create [filterSystem] with the log cache size set in the config.
ethcfg := s.APIBackend.eth.config
filterSystem := filters.NewFilterSystem(s.APIBackend, filters.Config{
LogCacheSize: ethcfg.FilterLogCacheSize,
Timeout: 5 * time.Minute,
IndexedLogCacheSize: ethcfg.IndexedFilterLogCacheSize,
UnindexedLogCacheSize: ethcfg.UnindexedFilterLogCacheSize,
Timeout: 5 * time.Minute,
})

// Append all the local APIs and return
Expand Down
31 changes: 17 additions & 14 deletions eth/ethconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,19 @@ var DefaultConfig = NewDefaultConfig()

func NewDefaultConfig() Config {
return Config{
NetworkId: 1,
TrieCleanCache: 512,
TrieDirtyCache: 256,
TrieDirtyCommitTarget: 20,
SnapshotCache: 256,
FilterLogCacheSize: 32,
Miner: miner.Config{},
TxPool: core.DefaultTxPoolConfig,
RPCGasCap: 25000000,
RPCEVMTimeout: 5 * time.Second,
GPO: DefaultFullGPOConfig,
RPCTxFeeCap: 1,
NetworkId: 1,
TrieCleanCache: 512,
TrieDirtyCache: 256,
TrieDirtyCommitTarget: 20,
SnapshotCache: 256,
IndexedFilterLogCacheSize: 32,
UnindexedFilterLogCacheSize: 32,
Miner: miner.Config{},
TxPool: core.DefaultTxPoolConfig,
RPCGasCap: 25000000,
RPCEVMTimeout: 5 * time.Second,
GPO: DefaultFullGPOConfig,
RPCTxFeeCap: 1,
}
}

Expand Down Expand Up @@ -101,8 +102,10 @@ type Config struct {
SnapshotCache int
Preimages bool

// This is the number of blocks for which logs will be cached in the filter system.
FilterLogCacheSize int
// This is the number of indexed blocks for which logs will be cached in the filter system.
IndexedFilterLogCacheSize int
// This is the number of unindexed blocks for which logs will be cached in the filter system.
UnindexedFilterLogCacheSize int

// Mining options
Miner miner.Config
Expand Down
26 changes: 13 additions & 13 deletions eth/filters/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
if header == nil {
return nil, errors.New("unknown block")
}
return f.blockLogs(ctx, header, false)
return f.blockLogs(ctx, header, false, f.sys.backend.LastBloomIndex() > header.Number.Uint64())
}
// Short-cut if all we care about is pending logs
if f.begin == rpc.PendingBlockNumber.Int64() {
Expand Down Expand Up @@ -176,14 +176,14 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
}
// Gather all indexed logs, and finish with non indexed ones
var (
logs []*types.Log
size, sections = f.sys.backend.BloomStatus()
logs []*types.Log
lastIndexed = f.sys.backend.LastBloomIndex()
)
if indexed := sections * size; indexed > uint64(f.begin) {
if indexed > end {
if lastIndexed > uint64(f.begin) {
if lastIndexed > end {
logs, err = f.indexedLogs(ctx, end)
} else {
logs, err = f.indexedLogs(ctx, indexed-1)
logs, err = f.indexedLogs(ctx, lastIndexed-1)
}
if err != nil {
return logs, err
Expand Down Expand Up @@ -229,7 +229,7 @@ func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, err
if header == nil || err != nil {
return logs, err
}
found, err := f.blockLogs(ctx, header, true)
found, err := f.blockLogs(ctx, header, true, true)
if err != nil {
return logs, err
}
Expand All @@ -251,7 +251,7 @@ func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, e
if header == nil || err != nil {
return logs, err
}
found, err := f.blockLogs(ctx, header, false)
found, err := f.blockLogs(ctx, header, false, false)
if err != nil {
return logs, err
}
Expand All @@ -261,24 +261,24 @@ func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, e
}

// blockLogs returns the logs matching the filter criteria within a single block.
func (f *Filter) blockLogs(ctx context.Context, header *types.Header, skipBloom bool) ([]*types.Log, error) {
func (f *Filter) blockLogs(ctx context.Context, header *types.Header, skipBloom bool, indexed bool) ([]*types.Log, error) {
// Fast track: no filtering criteria
if len(f.addresses) == 0 && len(f.topics) == 0 {
list, err := f.sys.cachedGetLogs(ctx, header.Hash(), header.Number.Uint64())
list, err := f.sys.cachedGetLogs(ctx, header.Hash(), header.Number.Uint64(), indexed)
if err != nil {
return nil, err
}
return flatten(list), nil
} else if skipBloom || bloomFilter(header.Bloom, f.addresses, f.topics) {
return f.checkMatches(ctx, header)
return f.checkMatches(ctx, header, indexed)
}
return nil, nil
}

// checkMatches checks if the receipts belonging to the given header contain any log events that
// match the filter criteria. This function is called when the bloom filter signals a potential match.
func (f *Filter) checkMatches(ctx context.Context, header *types.Header) ([]*types.Log, error) {
logsList, err := f.sys.cachedGetLogs(ctx, header.Hash(), header.Number.Uint64())
func (f *Filter) checkMatches(ctx context.Context, header *types.Header, indexed bool) ([]*types.Log, error) {
logsList, err := f.sys.cachedGetLogs(ctx, header.Hash(), header.Number.Uint64(), indexed)
if err != nil {
return nil, err
}
Expand Down
47 changes: 32 additions & 15 deletions eth/filters/filter_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,20 @@ import (

// Config represents the configuration of the filter system.
type Config struct {
LogCacheSize int // maximum number of cached blocks (default: 32)
Timeout time.Duration // how long filters stay active (default: 5min)
IndexedLogCacheSize int // maximum number of indexed cached blocks (default: 32)
UnindexedLogCacheSize int // maximum number of unindexed cached blocks (default: 32)
Timeout time.Duration // how long filters stay active (default: 5min)
}

func (cfg Config) withDefaults() Config {
if cfg.Timeout == 0 {
cfg.Timeout = 5 * time.Minute
}
if cfg.LogCacheSize == 0 {
cfg.LogCacheSize = 32
if cfg.IndexedLogCacheSize == 0 {
cfg.IndexedLogCacheSize = 32
}
if cfg.UnindexedLogCacheSize == 0 {
cfg.UnindexedLogCacheSize = 32
}
return cfg
}
Expand All @@ -83,6 +87,7 @@ type Backend interface {
SubscribeAcceptedTransactionEvent(ch chan<- core.NewTxsEvent) event.Subscription

BloomStatus() (uint64, uint64)
LastBloomIndex() uint64
ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)

// Added to the backend interface to support limiting of logs requests
Expand All @@ -93,29 +98,39 @@ type Backend interface {

// FilterSystem holds resources shared by all filters.
type FilterSystem struct {
backend Backend
logsCache *lru.Cache
cfg *Config
backend Backend
indexedLogsCache *lru.Cache
unindexedLogsCache *lru.Cache
cfg *Config
}

// NewFilterSystem creates a filter system.
func NewFilterSystem(backend Backend, config Config) *FilterSystem {
config = config.withDefaults()

cache, err := lru.New(config.LogCacheSize)
indexedCache, err := lru.New(config.IndexedLogCacheSize)
if err != nil {
panic(err)
}
unindexedCache, err := lru.New(config.UnindexedLogCacheSize)
if err != nil {
panic(err)
}
return &FilterSystem{
backend: backend,
logsCache: cache,
cfg: &config,
backend: backend,
indexedLogsCache: indexedCache,
unindexedLogsCache: unindexedCache,
cfg: &config,
}
}

// cachedGetLogs loads block logs from the backend and caches the result.
func (sys *FilterSystem) cachedGetLogs(ctx context.Context, blockHash common.Hash, number uint64) ([][]*types.Log, error) {
cached, ok := sys.logsCache.Get(blockHash)
func (sys *FilterSystem) cachedGetLogs(ctx context.Context, blockHash common.Hash, number uint64, indexed bool) ([][]*types.Log, error) {
cache := sys.indexedLogsCache
if !indexed {
cache = sys.unindexedLogsCache
}
cached, ok := cache.Get(blockHash)
if ok {
return cached.([][]*types.Log), nil
}
Expand All @@ -127,7 +142,7 @@ func (sys *FilterSystem) cachedGetLogs(ctx context.Context, blockHash common.Has
if logs == nil {
return nil, fmt.Errorf("failed to get logs for block #%d (0x%s)", number, blockHash.TerminalString())
}
sys.logsCache.Add(blockHash, logs)
cache.Add(blockHash, logs)
return logs, nil
}

Expand Down Expand Up @@ -626,7 +641,9 @@ func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.
// Get the logs of the block
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
logsList, err := es.sys.cachedGetLogs(ctx, header.Hash(), header.Number.Uint64())
indexed := es.sys.backend.LastBloomIndex()
headerNumber := header.Number.Uint64()
logsList, err := es.sys.cachedGetLogs(ctx, header.Hash(), headerNumber, indexed > headerNumber)
if err != nil {
return nil
}
Expand Down
5 changes: 5 additions & 0 deletions eth/filters/filter_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,11 @@ func (b *testBackend) BloomStatus() (uint64, uint64) {
return params.BloomBitsBlocks, b.sections
}

func (b *testBackend) LastBloomIndex() uint64 {
size, sections := b.BloomStatus()
return size * sections
}

func (b *testBackend) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) {
requests := make(chan chan *bloombits.Retrieval)

Expand Down
9 changes: 9 additions & 0 deletions plugin/evm/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ const (
defaultMaxOutboundActiveRequests = 16
defaultPopulateMissingTriesParallelism = 1024
defaultStateSyncServerTrieCache = 64 // MB
defaultIndexedFilterLogCacheSize = 32
defaultUnindexedFilterLogCacheSize = 32 // There can be up to [params.BloomBitsBlocks = 4096] unindexed blocks

// defaultStateSyncMinBlocks is the minimum number of blocks the blockchain
// should be ahead of local last accepted to perform state sync.
Expand Down Expand Up @@ -174,6 +176,11 @@ type Config struct {
// their node before the network upgrade and their node accepts blocks that have
// identical state with the pre-upgrade ruleset.
SkipUpgradeCheck bool `json:"skip-upgrade-check"`

// IndexedLogCacheSize is the number of indexed blocks for which logs will be cached in the filter system.
IndexedLogCacheSize int `json:"indexed-log-cache-size"`
// UnindexedLogCacheSize is the number of unindexed blocks for which logs will be cached in the filter system.
UnindexedLogCacheSize int `json:"unindexed-log-cache-size"`
}

// EthAPIs returns an array of strings representing the Eth APIs that should be enabled
Expand Down Expand Up @@ -219,6 +226,8 @@ func (c *Config) SetDefaults() {
c.StateSyncCommitInterval = defaultSyncableCommitInterval
c.StateSyncMinBlocks = defaultStateSyncMinBlocks
c.AllowUnprotectedTxHashes = defaultAllowUnprotectedTxHashes
c.IndexedLogCacheSize = defaultIndexedFilterLogCacheSize
c.UnindexedLogCacheSize = defaultUnindexedFilterLogCacheSize
}

func (d *Duration) UnmarshalJSON(data []byte) (err error) {
Expand Down
2 changes: 2 additions & 0 deletions plugin/evm/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,8 @@ func (vm *VM) Initialize(
vm.ethConfig.OfflinePruningDataDirectory = vm.config.OfflinePruningDataDirectory
vm.ethConfig.CommitInterval = vm.config.CommitInterval
vm.ethConfig.SkipUpgradeCheck = vm.config.SkipUpgradeCheck
vm.ethConfig.IndexedFilterLogCacheSize = vm.config.IndexedLogCacheSize
vm.ethConfig.UnindexedFilterLogCacheSize = vm.config.UnindexedLogCacheSize

// Create directory for offline pruning
if len(vm.ethConfig.OfflinePruningDataDirectory) != 0 {
Expand Down

0 comments on commit a54eae2

Please sign in to comment.