Skip to content

Commit

Permalink
bump firehose-core, adjusting poller and adding --max-block-fetch-dur…
Browse files Browse the repository at this point in the history
…ation
  • Loading branch information
sduchesneau committed Dec 17, 2024
1 parent 793218b commit bd86bfa
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 32 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,15 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). See [MAINTAINERS.md](./MAINTAINERS.md)
for instructions to keep up to date.

## v2.8.2

* Bump firehose-core to [v1.6.8](https://github.com/streamingfast/firehose-core/releases/tag/v1.6.8)
* Substreams: add `--substreams-tier1-enforce-compression` to reject connections from clients that do not support GZIP compression
* Substreams performance: reduced the number of mallocs (patching some third-party libraries)
* Substreams performance: removed heavy tracing (that wasn't exposed to the client)
* Fixed `--reader-node-line-buffer-size` flag that was not being respected in reader-node-stdin app
* poller: add `--max-block-fetch-duration`

## v2.8.1

* `firehose-grpc-listen-addr` and `substreams-tier1-grpc-listen-addr` flags now accepts comma-separated addresses (allows listening as plaintext and snakeoil-ssl at the same time or on specific ip addresses)
Expand Down
8 changes: 4 additions & 4 deletions blockfetcher/abrone.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ type ArbOneBlockFetcher struct {
fetcher *BlockFetcher
}

func NewArbOneBlockFetcher(rpcClient *rpc.Client, intervalBetweenFetch time.Duration, latestBlockRetryInterval time.Duration, logger *zap.Logger) *OptimismBlockFetcher {
fetcher := NewBlockFetcher(rpcClient, intervalBetweenFetch, latestBlockRetryInterval, block.RpcToEthBlock, logger)
func NewArbOneBlockFetcher(intervalBetweenFetch time.Duration, latestBlockRetryInterval time.Duration, logger *zap.Logger) *OptimismBlockFetcher {
fetcher := NewBlockFetcher(intervalBetweenFetch, latestBlockRetryInterval, block.RpcToEthBlock, logger)
return &OptimismBlockFetcher{
fetcher: fetcher,
}
}

func (f *ArbOneBlockFetcher) PollingInterval() time.Duration { return 1 * time.Second }

func (f *ArbOneBlockFetcher) Fetch(ctx context.Context, blockNum uint64) (*pbbstream.Block, error) {
return f.fetcher.Fetch(ctx, blockNum)
func (f *ArbOneBlockFetcher) Fetch(ctx context.Context, rpcClient *rpc.Client, blockNum uint64) (*pbbstream.Block, error) {
return f.fetcher.Fetch(ctx, rpcClient, blockNum)
}
8 changes: 4 additions & 4 deletions blockfetcher/optimism.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ func (f *OptimismBlockFetcher) IsBlockAvailable(requested uint64) bool {
return f.fetcher.IsBlockAvailable(requested)
}

func (f *OptimismBlockFetcher) Fetch(ctx context.Context, blockNum uint64) (b *pbbstream.Block, skipped bool, err error) {
blk, err := f.fetcher.Fetch(ctx, blockNum)
func (f *OptimismBlockFetcher) Fetch(ctx context.Context, rpcClient *rpc.Client, blockNum uint64) (b *pbbstream.Block, skipped bool, err error) {
blk, err := f.fetcher.Fetch(ctx, rpcClient, blockNum)
return blk, false, err
}

func NewOptimismBlockFetcher(rpcClient *rpc.Client, intervalBetweenFetch time.Duration, latestBlockRetryInterval time.Duration, logger *zap.Logger) *OptimismBlockFetcher {
fetcher := NewBlockFetcher(rpcClient, intervalBetweenFetch, latestBlockRetryInterval, block.RpcToEthBlock, logger)
func NewOptimismBlockFetcher(intervalBetweenFetch time.Duration, latestBlockRetryInterval time.Duration, logger *zap.Logger) *OptimismBlockFetcher {
fetcher := NewBlockFetcher(intervalBetweenFetch, latestBlockRetryInterval, block.RpcToEthBlock, logger)
return &OptimismBlockFetcher{
fetcher: fetcher,
}
Expand Down
12 changes: 5 additions & 7 deletions blockfetcher/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
type ToEthBlock func(in *rpc.Block, receipts map[string]*rpc.TransactionReceipt, logger *zap.Logger) (*pbeth.Block, map[string]bool)

type BlockFetcher struct {
rpcClient *rpc.Client
latest uint64
latestBlockRetryInterval time.Duration
fetchInterval time.Duration
Expand All @@ -29,9 +28,8 @@ type BlockFetcher struct {
logger *zap.Logger
}

func NewBlockFetcher(rpcClient *rpc.Client, intervalBetweenFetch, latestBlockRetryInterval time.Duration, toEthBlock ToEthBlock, logger *zap.Logger) *BlockFetcher {
func NewBlockFetcher(intervalBetweenFetch, latestBlockRetryInterval time.Duration, toEthBlock ToEthBlock, logger *zap.Logger) *BlockFetcher {
return &BlockFetcher{
rpcClient: rpcClient,
latestBlockRetryInterval: latestBlockRetryInterval,
toEthBlock: toEthBlock,
fetchInterval: intervalBetweenFetch,
Expand All @@ -43,10 +41,10 @@ func (f *BlockFetcher) IsBlockAvailable(blockNum uint64) bool {
return blockNum <= f.latest
}

func (f *BlockFetcher) Fetch(ctx context.Context, blockNum uint64) (block *pbbstream.Block, err error) {
func (f *BlockFetcher) Fetch(ctx context.Context, rpcClient *rpc.Client, blockNum uint64) (block *pbbstream.Block, err error) {
f.logger.Debug("fetching block", zap.Uint64("block_num", blockNum))
for f.latest < blockNum {
f.latest, err = f.rpcClient.LatestBlockNum(ctx)
f.latest, err = rpcClient.LatestBlockNum(ctx)
if err != nil {
return nil, fmt.Errorf("fetching latest block num: %w", err)
}
Expand All @@ -65,12 +63,12 @@ func (f *BlockFetcher) Fetch(ctx context.Context, blockNum uint64) (block *pbbst
time.Sleep(f.fetchInterval - sinceLastFetch)
}

rpcBlock, err := f.rpcClient.GetBlockByNumber(ctx, rpc.BlockNumber(blockNum), rpc.WithGetBlockFullTransaction())
rpcBlock, err := rpcClient.GetBlockByNumber(ctx, rpc.BlockNumber(blockNum), rpc.WithGetBlockFullTransaction())
if err != nil {
return nil, fmt.Errorf("fetching block %d: %w", blockNum, err)
}

receipts, err := FetchReceipts(ctx, rpcBlock, f.rpcClient)
receipts, err := FetchReceipts(ctx, rpcBlock, rpcClient)
if err != nil {
return nil, fmt.Errorf("fetching receipts for block %d %q: %w", rpcBlock.Number, rpcBlock.Hash.Pretty(), err)
}
Expand Down
29 changes: 18 additions & 11 deletions cmd/fireeth/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/streamingfast/eth-go/rpc"
firecore "github.com/streamingfast/firehose-core"
"github.com/streamingfast/firehose-core/blockpoller"
firecorerpc "github.com/streamingfast/firehose-core/rpc"
"github.com/streamingfast/firehose-ethereum/blockfetcher"
"github.com/streamingfast/logging"
"go.uber.org/zap"
Expand Down Expand Up @@ -60,36 +61,42 @@ func newGenericEVMPollerCmd(logger *zap.Logger, tracer logging.Tracer) *cobra.Co
RunE: pollerRunE(logger, tracer),
}
cmd.Flags().Duration("interval-between-fetch", 0, "interval between fetch")
cmd.Flags().Duration("max-block-fetch-duration", 5*time.Second, "maximum delay before retrying a block fetch")

return cmd
}

func pollerRunE(logger *zap.Logger, tracer logging.Tracer) firecore.CommandExecutor {
return func(cmd *cobra.Command, args []string) (err error) {
ctx := cmd.Context()

rpcEndpoint := args[0]
//dataDir := cmd.Flag("data-dir").Value.String()
fetchInterval := sflags.MustGetDuration(cmd, "interval-between-fetch")
maxBlockFetchDuration := sflags.MustGetDuration(cmd, "max-block-fetch-duration")

dataDir := sflags.MustGetString(cmd, "data-dir")
stateDir := path.Join(dataDir, "poller-state")

logger.Info("launching firehose-ethereum poller", zap.String("rpc_endpoint", rpcEndpoint), zap.String("data_dir", dataDir), zap.String("state_dir", stateDir))

rpcClient := rpc.NewClient(rpcEndpoint)

firstStreamableBlock, err := strconv.ParseUint(args[1], 10, 64)
if err != nil {
return fmt.Errorf("unable to parse first streamable block %d: %w", firstStreamableBlock, err)
}

fetchInterval := sflags.MustGetDuration(cmd, "interval-between-fetch")

fetcher := blockfetcher.NewOptimismBlockFetcher(rpcClient, fetchInterval, 1*time.Second, logger)
logger.Info("launching firehose-ethereum poller",
zap.String("rpc_endpoint", rpcEndpoint),
zap.String("data_dir", dataDir),
zap.String("state_dir", stateDir),
zap.Duration("fetch_interval", fetchInterval),
zap.Duration("max_block_fetch_duration", maxBlockFetchDuration),
zap.Uint64("first_streamable_block", firstStreamableBlock),
)
rpcClients := firecorerpc.NewClients[*rpc.Client](maxBlockFetchDuration, firecorerpc.NewStickyRollingStrategy[*rpc.Client](), logger)
rpcClients.Add(rpc.NewClient(rpcEndpoint))

fetcher := blockfetcher.NewOptimismBlockFetcher(fetchInterval, 1*time.Second, logger)
handler := blockpoller.NewFireBlockHandler("type.googleapis.com/sf.ethereum.type.v2.Block")
poller := blockpoller.New(fetcher, handler, blockpoller.WithStoringState(stateDir), blockpoller.WithLogger(logger))
poller := blockpoller.New[*rpc.Client](fetcher, handler, rpcClients, blockpoller.WithStoringState[*rpc.Client](stateDir), blockpoller.WithLogger[*rpc.Client](logger))

err = poller.Run(ctx, firstStreamableBlock, 1)
err = poller.Run(firstStreamableBlock, nil, 1)
if err != nil {
return fmt.Errorf("running poller: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ require (
github.com/streamingfast/dstore v0.1.1-0.20241011152904-9acd6205dc14
github.com/streamingfast/eth-go v0.0.0-20240312122859-216e183c0b7f
github.com/streamingfast/firehose v0.1.1-0.20240118135215-dcf04d40bfcd
github.com/streamingfast/firehose-core v1.6.7
github.com/streamingfast/firehose-core v1.6.8
github.com/streamingfast/firehose-ethereum/types v0.0.0-20240603154554-acc011d4f8c4
github.com/streamingfast/jsonpb v0.0.0-20210811021341-3670f0aa02d0
github.com/streamingfast/logging v0.0.0-20230608130331-f22c91403091
github.com/streamingfast/pbgo v0.0.6-0.20240823134334-812f6a16c5cb
github.com/streamingfast/shutter v1.5.0
github.com/streamingfast/substreams v1.11.2-0.20241202193558-30d991758e7c
github.com/streamingfast/substreams v1.11.3
github.com/stretchr/testify v1.9.0
github.com/test-go/testify v1.1.4
github.com/tidwall/gjson v1.18.0
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2163,8 +2163,8 @@ github.com/streamingfast/eth-go v0.0.0-20240312122859-216e183c0b7f h1:1LwsmRVENf
github.com/streamingfast/eth-go v0.0.0-20240312122859-216e183c0b7f/go.mod h1:UEm8dqibr3c3A1iIA3CHpkhN7j3X78prN7/55sXf3A0=
github.com/streamingfast/firehose v0.1.1-0.20240118135215-dcf04d40bfcd h1:t5n8dDcgUi7t36Qwxm19K4H2vyOLJfY6MHxTbOvK1z8=
github.com/streamingfast/firehose v0.1.1-0.20240118135215-dcf04d40bfcd/go.mod h1:du6tys2Q6X2pRQ3JbCziWiy7Y7KrOcl4CSb9uiGsVxA=
github.com/streamingfast/firehose-core v1.6.7 h1:/c/WiScYKdyVPRKFyBEPkg9trSd7MudaroOcaCbF+R4=
github.com/streamingfast/firehose-core v1.6.7/go.mod h1:etw8aLLRDT+rSI6tYh33XTg58OEtCrr9idWsjsvYOYs=
github.com/streamingfast/firehose-core v1.6.8 h1:vDnb+USaBFI1JejhyI+D8LeF4s4KJe887eRIfjzIAik=
github.com/streamingfast/firehose-core v1.6.8/go.mod h1:7IyMnJbDycP7ELEXTexwR2Hr5wyqu05Ebh9uYLKNKlQ=
github.com/streamingfast/firehose-ethereum/types v0.0.0-20240603154554-acc011d4f8c4 h1:gr6ew/RxqQNDtDejLsNE3oAkvyitrtlj15alAjRDKs8=
github.com/streamingfast/firehose-ethereum/types v0.0.0-20240603154554-acc011d4f8c4/go.mod h1:CG22ObinxSbKIP19bAj0uro0a290kzZTiBbjL8VR0SE=
github.com/streamingfast/google-cloud-go v0.0.0-20241202191831-95d7819ab4ad h1:dN+8f4S/YhLlcEANrcP8/5Yd/AUzPoV+wQ9Ts7qFZ5o=
Expand Down Expand Up @@ -2192,8 +2192,8 @@ github.com/streamingfast/shutter v1.5.0 h1:NpzDYzj0HVpSiDJVO/FFSL6QIK/YKOxY0gJAt
github.com/streamingfast/shutter v1.5.0/go.mod h1:B/T6efqdeMGbGwjzPS1ToXzYZI4kDzI5/u4I+7qbjY8=
github.com/streamingfast/snapshotter v0.0.0-20230316190750-5bcadfde44d0 h1:Y15G1Z4fpEdm2b+/70owI7TLuXadlqBtGM7rk4Hxrzk=
github.com/streamingfast/snapshotter v0.0.0-20230316190750-5bcadfde44d0/go.mod h1:/Rnz2TJvaShjUct0scZ9kKV2Jr9/+KBAoWy4UMYxgv4=
github.com/streamingfast/substreams v1.11.2-0.20241202193558-30d991758e7c h1:nMc8fGRXc97t4vBsNMCDGQrG1o9nc5Yf+LO+Dc4orcA=
github.com/streamingfast/substreams v1.11.2-0.20241202193558-30d991758e7c/go.mod h1:6qjmyNq0INxC/3EXF4M2QEf1Su7X8Lq2FCKfGIFXupU=
github.com/streamingfast/substreams v1.11.3 h1:enNRo1M67Pzkx1vzGWj+M2birgZvC3n46mUcbdUeLQA=
github.com/streamingfast/substreams v1.11.3/go.mod h1:6qjmyNq0INxC/3EXF4M2QEf1Su7X8Lq2FCKfGIFXupU=
github.com/streamingfast/wazero v0.0.0-20241202185309-91287c3640ed h1:LU6/c376zP1cMAo9L6rFLyjo0W7RU+hIh7BegH8Zo5M=
github.com/streamingfast/wazero v0.0.0-20241202185309-91287c3640ed/go.mod h1:yAI0XTsMBhREkM/YDAK/zNou3GoiAce1P6+rp/wQhjs=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down

0 comments on commit bd86bfa

Please sign in to comment.