From bd86bfa7a680ea4b4173e8ffca53b55007637350 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Duchesneau?= Date: Tue, 17 Dec 2024 10:00:56 -0500 Subject: [PATCH] bump firehose-core, adjusting poller and adding --max-block-fetch-duration --- CHANGELOG.md | 9 +++++++++ blockfetcher/abrone.go | 8 ++++---- blockfetcher/optimism.go | 8 ++++---- blockfetcher/rpc.go | 12 +++++------- cmd/fireeth/poller.go | 29 ++++++++++++++++++----------- go.mod | 4 ++-- go.sum | 8 ++++---- 7 files changed, 46 insertions(+), 32 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1496c054..331c1d76 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,15 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). See [MAINTAINERS.md](./MAINTAINERS.md) for instructions to keep up to date. +## v2.8.2 + +* Bump firehose-core to [v1.6.8](https://github.com/streamingfast/firehose-core/releases/tag/v1.6.8) +* Substreams: add `--substreams-tier1-enforce-compression` to reject connections from clients that do not support GZIP compression +* Substreams performance: reduced the number of mallocs (patching some third-party libraries) +* Substreams performance: removed heavy tracing (that wasn't exposed to the client) +* Fixed `--reader-node-line-buffer-size` flag that was not being respected in reader-node-stdin app +* poller: add `--max-block-fetch-duration` + ## v2.8.1 * `firehose-grpc-listen-addr` and `substreams-tier1-grpc-listen-addr` flags now accepts comma-separated addresses (allows listening as plaintext and snakeoil-ssl at the same time or on specific ip addresses) diff --git a/blockfetcher/abrone.go b/blockfetcher/abrone.go index 05c18f7e..c391183e 100644 --- a/blockfetcher/abrone.go +++ b/blockfetcher/abrone.go @@ -15,8 +15,8 @@ type ArbOneBlockFetcher struct { fetcher *BlockFetcher } -func NewArbOneBlockFetcher(rpcClient *rpc.Client, intervalBetweenFetch time.Duration, latestBlockRetryInterval time.Duration, logger *zap.Logger) *OptimismBlockFetcher { - fetcher := NewBlockFetcher(rpcClient, intervalBetweenFetch, latestBlockRetryInterval, block.RpcToEthBlock, logger) +func NewArbOneBlockFetcher(intervalBetweenFetch time.Duration, latestBlockRetryInterval time.Duration, logger *zap.Logger) *OptimismBlockFetcher { + fetcher := NewBlockFetcher(intervalBetweenFetch, latestBlockRetryInterval, block.RpcToEthBlock, logger) return &OptimismBlockFetcher{ fetcher: fetcher, } @@ -24,6 +24,6 @@ func NewArbOneBlockFetcher(rpcClient *rpc.Client, intervalBetweenFetch time.Dura func (f *ArbOneBlockFetcher) PollingInterval() time.Duration { return 1 * time.Second } -func (f *ArbOneBlockFetcher) Fetch(ctx context.Context, blockNum uint64) (*pbbstream.Block, error) { - return f.fetcher.Fetch(ctx, blockNum) +func (f *ArbOneBlockFetcher) Fetch(ctx context.Context, rpcClient *rpc.Client, blockNum uint64) (*pbbstream.Block, error) { + return f.fetcher.Fetch(ctx, rpcClient, blockNum) } diff --git a/blockfetcher/optimism.go b/blockfetcher/optimism.go index 58ac64c6..dba578b1 100644 --- a/blockfetcher/optimism.go +++ b/blockfetcher/optimism.go @@ -19,13 +19,13 @@ func (f *OptimismBlockFetcher) IsBlockAvailable(requested uint64) bool { return f.fetcher.IsBlockAvailable(requested) } -func (f *OptimismBlockFetcher) Fetch(ctx context.Context, blockNum uint64) (b *pbbstream.Block, skipped bool, err error) { - blk, err := f.fetcher.Fetch(ctx, blockNum) +func (f *OptimismBlockFetcher) Fetch(ctx context.Context, rpcClient *rpc.Client, blockNum uint64) (b *pbbstream.Block, skipped bool, err error) { + blk, err := f.fetcher.Fetch(ctx, rpcClient, blockNum) return blk, false, err } -func NewOptimismBlockFetcher(rpcClient *rpc.Client, intervalBetweenFetch time.Duration, latestBlockRetryInterval time.Duration, logger *zap.Logger) *OptimismBlockFetcher { - fetcher := NewBlockFetcher(rpcClient, intervalBetweenFetch, latestBlockRetryInterval, block.RpcToEthBlock, logger) +func NewOptimismBlockFetcher(intervalBetweenFetch time.Duration, latestBlockRetryInterval time.Duration, logger *zap.Logger) *OptimismBlockFetcher { + fetcher := NewBlockFetcher(intervalBetweenFetch, latestBlockRetryInterval, block.RpcToEthBlock, logger) return &OptimismBlockFetcher{ fetcher: fetcher, } diff --git a/blockfetcher/rpc.go b/blockfetcher/rpc.go index dc3ac588..6624108b 100644 --- a/blockfetcher/rpc.go +++ b/blockfetcher/rpc.go @@ -20,7 +20,6 @@ import ( type ToEthBlock func(in *rpc.Block, receipts map[string]*rpc.TransactionReceipt, logger *zap.Logger) (*pbeth.Block, map[string]bool) type BlockFetcher struct { - rpcClient *rpc.Client latest uint64 latestBlockRetryInterval time.Duration fetchInterval time.Duration @@ -29,9 +28,8 @@ type BlockFetcher struct { logger *zap.Logger } -func NewBlockFetcher(rpcClient *rpc.Client, intervalBetweenFetch, latestBlockRetryInterval time.Duration, toEthBlock ToEthBlock, logger *zap.Logger) *BlockFetcher { +func NewBlockFetcher(intervalBetweenFetch, latestBlockRetryInterval time.Duration, toEthBlock ToEthBlock, logger *zap.Logger) *BlockFetcher { return &BlockFetcher{ - rpcClient: rpcClient, latestBlockRetryInterval: latestBlockRetryInterval, toEthBlock: toEthBlock, fetchInterval: intervalBetweenFetch, @@ -43,10 +41,10 @@ func (f *BlockFetcher) IsBlockAvailable(blockNum uint64) bool { return blockNum <= f.latest } -func (f *BlockFetcher) Fetch(ctx context.Context, blockNum uint64) (block *pbbstream.Block, err error) { +func (f *BlockFetcher) Fetch(ctx context.Context, rpcClient *rpc.Client, blockNum uint64) (block *pbbstream.Block, err error) { f.logger.Debug("fetching block", zap.Uint64("block_num", blockNum)) for f.latest < blockNum { - f.latest, err = f.rpcClient.LatestBlockNum(ctx) + f.latest, err = rpcClient.LatestBlockNum(ctx) if err != nil { return nil, fmt.Errorf("fetching latest block num: %w", err) } @@ -65,12 +63,12 @@ func (f *BlockFetcher) Fetch(ctx context.Context, blockNum uint64) (block *pbbst time.Sleep(f.fetchInterval - sinceLastFetch) } - rpcBlock, err := f.rpcClient.GetBlockByNumber(ctx, rpc.BlockNumber(blockNum), rpc.WithGetBlockFullTransaction()) + rpcBlock, err := rpcClient.GetBlockByNumber(ctx, rpc.BlockNumber(blockNum), rpc.WithGetBlockFullTransaction()) if err != nil { return nil, fmt.Errorf("fetching block %d: %w", blockNum, err) } - receipts, err := FetchReceipts(ctx, rpcBlock, f.rpcClient) + receipts, err := FetchReceipts(ctx, rpcBlock, rpcClient) if err != nil { return nil, fmt.Errorf("fetching receipts for block %d %q: %w", rpcBlock.Number, rpcBlock.Hash.Pretty(), err) } diff --git a/cmd/fireeth/poller.go b/cmd/fireeth/poller.go index d462aac6..8622ad1e 100644 --- a/cmd/fireeth/poller.go +++ b/cmd/fireeth/poller.go @@ -11,6 +11,7 @@ import ( "github.com/streamingfast/eth-go/rpc" firecore "github.com/streamingfast/firehose-core" "github.com/streamingfast/firehose-core/blockpoller" + firecorerpc "github.com/streamingfast/firehose-core/rpc" "github.com/streamingfast/firehose-ethereum/blockfetcher" "github.com/streamingfast/logging" "go.uber.org/zap" @@ -60,36 +61,42 @@ func newGenericEVMPollerCmd(logger *zap.Logger, tracer logging.Tracer) *cobra.Co RunE: pollerRunE(logger, tracer), } cmd.Flags().Duration("interval-between-fetch", 0, "interval between fetch") + cmd.Flags().Duration("max-block-fetch-duration", 5*time.Second, "maximum delay before retrying a block fetch") return cmd } func pollerRunE(logger *zap.Logger, tracer logging.Tracer) firecore.CommandExecutor { return func(cmd *cobra.Command, args []string) (err error) { - ctx := cmd.Context() - rpcEndpoint := args[0] //dataDir := cmd.Flag("data-dir").Value.String() + fetchInterval := sflags.MustGetDuration(cmd, "interval-between-fetch") + maxBlockFetchDuration := sflags.MustGetDuration(cmd, "max-block-fetch-duration") dataDir := sflags.MustGetString(cmd, "data-dir") stateDir := path.Join(dataDir, "poller-state") - logger.Info("launching firehose-ethereum poller", zap.String("rpc_endpoint", rpcEndpoint), zap.String("data_dir", dataDir), zap.String("state_dir", stateDir)) - - rpcClient := rpc.NewClient(rpcEndpoint) - firstStreamableBlock, err := strconv.ParseUint(args[1], 10, 64) if err != nil { return fmt.Errorf("unable to parse first streamable block %d: %w", firstStreamableBlock, err) } - fetchInterval := sflags.MustGetDuration(cmd, "interval-between-fetch") - - fetcher := blockfetcher.NewOptimismBlockFetcher(rpcClient, fetchInterval, 1*time.Second, logger) + logger.Info("launching firehose-ethereum poller", + zap.String("rpc_endpoint", rpcEndpoint), + zap.String("data_dir", dataDir), + zap.String("state_dir", stateDir), + zap.Duration("fetch_interval", fetchInterval), + zap.Duration("max_block_fetch_duration", maxBlockFetchDuration), + zap.Uint64("first_streamable_block", firstStreamableBlock), + ) + rpcClients := firecorerpc.NewClients[*rpc.Client](maxBlockFetchDuration, firecorerpc.NewStickyRollingStrategy[*rpc.Client](), logger) + rpcClients.Add(rpc.NewClient(rpcEndpoint)) + + fetcher := blockfetcher.NewOptimismBlockFetcher(fetchInterval, 1*time.Second, logger) handler := blockpoller.NewFireBlockHandler("type.googleapis.com/sf.ethereum.type.v2.Block") - poller := blockpoller.New(fetcher, handler, blockpoller.WithStoringState(stateDir), blockpoller.WithLogger(logger)) + poller := blockpoller.New[*rpc.Client](fetcher, handler, rpcClients, blockpoller.WithStoringState[*rpc.Client](stateDir), blockpoller.WithLogger[*rpc.Client](logger)) - err = poller.Run(ctx, firstStreamableBlock, 1) + err = poller.Run(firstStreamableBlock, nil, 1) if err != nil { return fmt.Errorf("running poller: %w", err) } diff --git a/go.mod b/go.mod index 59e47ed4..53630ef1 100644 --- a/go.mod +++ b/go.mod @@ -22,13 +22,13 @@ require ( github.com/streamingfast/dstore v0.1.1-0.20241011152904-9acd6205dc14 github.com/streamingfast/eth-go v0.0.0-20240312122859-216e183c0b7f github.com/streamingfast/firehose v0.1.1-0.20240118135215-dcf04d40bfcd - github.com/streamingfast/firehose-core v1.6.7 + github.com/streamingfast/firehose-core v1.6.8 github.com/streamingfast/firehose-ethereum/types v0.0.0-20240603154554-acc011d4f8c4 github.com/streamingfast/jsonpb v0.0.0-20210811021341-3670f0aa02d0 github.com/streamingfast/logging v0.0.0-20230608130331-f22c91403091 github.com/streamingfast/pbgo v0.0.6-0.20240823134334-812f6a16c5cb github.com/streamingfast/shutter v1.5.0 - github.com/streamingfast/substreams v1.11.2-0.20241202193558-30d991758e7c + github.com/streamingfast/substreams v1.11.3 github.com/stretchr/testify v1.9.0 github.com/test-go/testify v1.1.4 github.com/tidwall/gjson v1.18.0 diff --git a/go.sum b/go.sum index 72b18c23..b3491b8a 100644 --- a/go.sum +++ b/go.sum @@ -2163,8 +2163,8 @@ github.com/streamingfast/eth-go v0.0.0-20240312122859-216e183c0b7f h1:1LwsmRVENf github.com/streamingfast/eth-go v0.0.0-20240312122859-216e183c0b7f/go.mod h1:UEm8dqibr3c3A1iIA3CHpkhN7j3X78prN7/55sXf3A0= github.com/streamingfast/firehose v0.1.1-0.20240118135215-dcf04d40bfcd h1:t5n8dDcgUi7t36Qwxm19K4H2vyOLJfY6MHxTbOvK1z8= github.com/streamingfast/firehose v0.1.1-0.20240118135215-dcf04d40bfcd/go.mod h1:du6tys2Q6X2pRQ3JbCziWiy7Y7KrOcl4CSb9uiGsVxA= -github.com/streamingfast/firehose-core v1.6.7 h1:/c/WiScYKdyVPRKFyBEPkg9trSd7MudaroOcaCbF+R4= -github.com/streamingfast/firehose-core v1.6.7/go.mod h1:etw8aLLRDT+rSI6tYh33XTg58OEtCrr9idWsjsvYOYs= +github.com/streamingfast/firehose-core v1.6.8 h1:vDnb+USaBFI1JejhyI+D8LeF4s4KJe887eRIfjzIAik= +github.com/streamingfast/firehose-core v1.6.8/go.mod h1:7IyMnJbDycP7ELEXTexwR2Hr5wyqu05Ebh9uYLKNKlQ= github.com/streamingfast/firehose-ethereum/types v0.0.0-20240603154554-acc011d4f8c4 h1:gr6ew/RxqQNDtDejLsNE3oAkvyitrtlj15alAjRDKs8= github.com/streamingfast/firehose-ethereum/types v0.0.0-20240603154554-acc011d4f8c4/go.mod h1:CG22ObinxSbKIP19bAj0uro0a290kzZTiBbjL8VR0SE= github.com/streamingfast/google-cloud-go v0.0.0-20241202191831-95d7819ab4ad h1:dN+8f4S/YhLlcEANrcP8/5Yd/AUzPoV+wQ9Ts7qFZ5o= @@ -2192,8 +2192,8 @@ github.com/streamingfast/shutter v1.5.0 h1:NpzDYzj0HVpSiDJVO/FFSL6QIK/YKOxY0gJAt github.com/streamingfast/shutter v1.5.0/go.mod h1:B/T6efqdeMGbGwjzPS1ToXzYZI4kDzI5/u4I+7qbjY8= github.com/streamingfast/snapshotter v0.0.0-20230316190750-5bcadfde44d0 h1:Y15G1Z4fpEdm2b+/70owI7TLuXadlqBtGM7rk4Hxrzk= github.com/streamingfast/snapshotter v0.0.0-20230316190750-5bcadfde44d0/go.mod h1:/Rnz2TJvaShjUct0scZ9kKV2Jr9/+KBAoWy4UMYxgv4= -github.com/streamingfast/substreams v1.11.2-0.20241202193558-30d991758e7c h1:nMc8fGRXc97t4vBsNMCDGQrG1o9nc5Yf+LO+Dc4orcA= -github.com/streamingfast/substreams v1.11.2-0.20241202193558-30d991758e7c/go.mod h1:6qjmyNq0INxC/3EXF4M2QEf1Su7X8Lq2FCKfGIFXupU= +github.com/streamingfast/substreams v1.11.3 h1:enNRo1M67Pzkx1vzGWj+M2birgZvC3n46mUcbdUeLQA= +github.com/streamingfast/substreams v1.11.3/go.mod h1:6qjmyNq0INxC/3EXF4M2QEf1Su7X8Lq2FCKfGIFXupU= github.com/streamingfast/wazero v0.0.0-20241202185309-91287c3640ed h1:LU6/c376zP1cMAo9L6rFLyjo0W7RU+hIh7BegH8Zo5M= github.com/streamingfast/wazero v0.0.0-20241202185309-91287c3640ed/go.mod h1:yAI0XTsMBhREkM/YDAK/zNou3GoiAce1P6+rp/wQhjs= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=