From 05574d87bf59565bce87901aaa48694b12ba7ed1 Mon Sep 17 00:00:00 2001 From: Matthieu Vachon Date: Thu, 16 Jan 2025 12:54:47 -0500 Subject: [PATCH] Feature/max tier1 requests (#72) * Fixed `firecore.GetTmpDir` not returning `tmpDir` after first call if already created * Added control knob for substreams tier1 max active requests # Conflicts: # CHANGELOG.md # go.mod # go.sum # test/integration_test.go --- CHANGELOG.md | 16 ++++++++++++++ cmd/apps/substreams_tier1.go | 41 ++++++++++++++++++++++++++---------- devel/standard/standard.yaml | 4 +++- go.mod | 4 ++-- go.sum | 8 +++---- storage.go | 2 +- storage_test.go | 23 ++++++++++++++++++++ utils.go | 4 +++- 8 files changed, 82 insertions(+), 20 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 30acfa0..9d5acac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,22 @@ If you were at `firehose-core` version `1.0.0` and are bumping to `1.1.0`, you s ### Substreams +* The `substreams-tier1` app now has two new configuration flags named respectively `substreams-tier1-active-requests-soft-limit` and `substreams-tier1-active-requests-hard-limit` +helping better load balance active requests across a pool of `tier1` instances. + + The `substreams-tier1-active-requests-soft-limit` limits the number of client active requests that a tier1 accepts before starting + to be report itself as 'unready' within the health check endpoint. A limit of 0 or less means no limit. + + This is useful to load balance active requests more easily across a pool of tier1 instance. When the instance reaches the soft + limit, it will start to be unready from the load balancer standpoint. The load balancer in return will remove it from the list + of available instances, and new connections will be routed to remaining clients, spreading the load. + + The `substreams-tier1-active-requests-hard-limit` limits the number of client active requests that a tier1 accepts before + rejecting incoming gRPC requests with 'Unavailable' code and setting itself as unready. A limit of 0 or less means no limit. + + This is useful to prevent the tier1 from being overwhelmed by too many requests, most client auto-reconnects on 'Unavailable' code + so they should end up on another tier1 instance, assuming you have proper auto-scaling of the number of instances available. + * Properly accept and compress responses with `gzip` for browser HTTP clients using ConnectWeb with `Accept-Encoding` header * Allow setting subscription channel max capacity via `SOURCE_CHAN_SIZE` env var (default: 100) diff --git a/cmd/apps/substreams_tier1.go b/cmd/apps/substreams_tier1.go index 04d9eff..792e560 100644 --- a/cmd/apps/substreams_tier1.go +++ b/cmd/apps/substreams_tier1.go @@ -22,6 +22,7 @@ import ( "github.com/spf13/cobra" "github.com/spf13/viper" + "github.com/streamingfast/cli" "github.com/streamingfast/dauth" discoveryservice "github.com/streamingfast/dgrpc/server/discovery-service" firecore "github.com/streamingfast/firehose-core" @@ -52,7 +53,21 @@ func RegisterSubstreamsTier1App[B firecore.Block](chain *firecore.Chain[B], root cmd.Flags().Bool("substreams-tier1-enforce-compression", true, "Reject any request that does not accept gzip or zstd encoding in their GRPC/Connect header") cmd.Flags().Int("substreams-tier1-max-subrequests", 4, "number of parallel subrequests that the tier1 can make to the tier2 per request") cmd.Flags().String("substreams-tier1-block-type", "", "Block type to use for the substreams tier1 (Ex: sf.ethereum.type.v2.Block)") - + cmd.Flags().Int("substreams-tier1-active-requests-soft-limit", 0, cli.FlagDescription(` + The number of client active requests that a tier1 accepts before starting to be report itself as 'unready' within the health + check endpoint. A limit of 0 or less means no limit. + + This is useful to load balance active requests more easily across a pool of tier1 instance. When the instance reaches the soft + limit, it will start to be unready from the load balancer standpoint. The load balancer in return will remove it from the list + of available instances, and new connections will be routed to remaining clients, spreading the load. + `)) + cmd.Flags().Int("substreams-tier1-active-requests-hard-limit", 0, cli.FlagDescription(` + The maximum number of client active requests that a tier1 accepts before rejecting incoming gRPC requests with 'Unavailable' code + and setting itself as unready. A limit of 0 or less means no limit. + + This is useful to prevent the tier1 from being overwhelmed by too many requests, most client auto-reconnects on 'Unavailable' code + so they should end up on another tier1 instance, assuming you have proper auto-scaling of the number of instances available. + `)) // all substreams registerCommonSubstreamsFlags(cmd) return nil @@ -86,6 +101,8 @@ func RegisterSubstreamsTier1App[B firecore.Block](chain *firecore.Chain[B], root subrequestsInsecure := viper.GetBool("substreams-tier1-subrequests-insecure") subrequestsPlaintext := viper.GetBool("substreams-tier1-subrequests-plaintext") maxSubrequests := viper.GetUint64("substreams-tier1-max-subrequests") + activeRequestsSoftLimit := viper.GetInt("substreams-tier1-active-requests-soft-limit") + activeRequestsHardLimit := viper.GetInt("substreams-tier1-active-requests-hard-limit") var blockType string if chain.DefaultBlockType != "" { @@ -137,16 +154,18 @@ func RegisterSubstreamsTier1App[B firecore.Block](chain *firecore.Chain[B], root BlockStreamAddr: blockstreamAddr, TmpDir: tmpDir, - StateStoreURL: stateStoreURL, - StateStoreDefaultTag: stateStoreDefaultTag, - StateBundleSize: stateBundleSize, - MaxSubrequests: maxSubrequests, - SubrequestsEndpoint: subrequestsEndpoint, - SubrequestsInsecure: subrequestsInsecure, - SubrequestsPlaintext: subrequestsPlaintext, - BlockType: blockType, - WASMExtensions: wasmExtensions, - BlockExecutionTimeout: executionTimeout, + StateStoreURL: stateStoreURL, + StateStoreDefaultTag: stateStoreDefaultTag, + StateBundleSize: stateBundleSize, + MaxSubrequests: maxSubrequests, + SubrequestsEndpoint: subrequestsEndpoint, + ActiveRequestsSoftLimit: activeRequestsSoftLimit, + ActiveRequestsHardLimit: activeRequestsHardLimit, + SubrequestsInsecure: subrequestsInsecure, + SubrequestsPlaintext: subrequestsPlaintext, + BlockType: blockType, + WASMExtensions: wasmExtensions, + BlockExecutionTimeout: executionTimeout, Tracing: tracing, diff --git a/devel/standard/standard.yaml b/devel/standard/standard.yaml index 7736f8a..26a2f79 100644 --- a/devel/standard/standard.yaml +++ b/devel/standard/standard.yaml @@ -4,6 +4,8 @@ start: - merger - relayer - firehose + - substreams-tier1 + - substreams-tier2 flags: advertise-block-id-encoding: "hex" advertise-chain-name: "acme-dummy-blockchain" @@ -20,4 +22,4 @@ start: --store-dir="{node-data-dir}" --block-rate=120 --genesis-height=0 - --genesis-block-burst=1000 + --genesis-block-burst=100 diff --git a/go.mod b/go.mod index 71a636e..1c5c4b4 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.15.0 github.com/streamingfast/bstream v0.0.2-0.20250114192704-6a23c67c0b4d - github.com/streamingfast/cli v0.0.4-0.20241119021815-815afa473375 + github.com/streamingfast/cli v0.0.4-0.20250116003948-fbf66c930cce github.com/streamingfast/dauth v0.0.0-20240222213226-519afc16cf84 github.com/streamingfast/dbin v0.9.1-0.20231117225723-59790c798e2c github.com/streamingfast/derr v0.0.0-20230515163924-8570aaa43fe1 @@ -31,7 +31,7 @@ require ( github.com/streamingfast/payment-gateway v0.0.0-20240426151444-581e930c76e2 github.com/streamingfast/pbgo v0.0.6-0.20250114182320-0b43084f4000 github.com/streamingfast/snapshotter v0.0.0-20230316190750-5bcadfde44d0 - github.com/streamingfast/substreams v1.11.4-0.20250116013259-ebf5362125de + github.com/streamingfast/substreams v1.11.4-0.20250116174758-7b0afb88692e github.com/stretchr/testify v1.9.0 github.com/test-go/testify v1.1.4 go.uber.org/multierr v1.10.0 diff --git a/go.sum b/go.sum index ff06694..bb62cd5 100644 --- a/go.sum +++ b/go.sum @@ -2138,8 +2138,8 @@ github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag github.com/stoewer/go-strcase v1.3.0/go.mod h1:fAH5hQ5pehh+j3nZfvwdk2RgEgQjAoM8wodgtPmh1xo= github.com/streamingfast/bstream v0.0.2-0.20250114192704-6a23c67c0b4d h1:5cGG1t9rwbAwXeTq9epU7hm6cBsC2V8DM2jVCIN6JSo= github.com/streamingfast/bstream v0.0.2-0.20250114192704-6a23c67c0b4d/go.mod h1:n5wy+Vmwp4xbjXO7B81MAkAgjnf1vJ/lI2y6hWWyFbg= -github.com/streamingfast/cli v0.0.4-0.20241119021815-815afa473375 h1:nwuFSEJtQfqTuN62WvysfAtDT4qqwQ6ghFX0i2VY1fY= -github.com/streamingfast/cli v0.0.4-0.20241119021815-815afa473375/go.mod h1:qOksW3DPhHVYBo8dcYxS7K3Q09wlcOChSdopeOjLWng= +github.com/streamingfast/cli v0.0.4-0.20250116003948-fbf66c930cce h1:RWla1PaRrlDf/MOwVoN/dJhIM/dXa9O4rmKZkv9T5bg= +github.com/streamingfast/cli v0.0.4-0.20250116003948-fbf66c930cce/go.mod h1:qOksW3DPhHVYBo8dcYxS7K3Q09wlcOChSdopeOjLWng= github.com/streamingfast/dauth v0.0.0-20240222213226-519afc16cf84 h1:yCvuNcwQ21J4Ua6YrAmHDBx3bjK04y+ssEYBe65BXRU= github.com/streamingfast/dauth v0.0.0-20240222213226-519afc16cf84/go.mod h1:cwfI5vaMd+CiwZIL0H0JdP5UDWCZOVFz/ex3L0+o/j4= github.com/streamingfast/dbin v0.9.1-0.20231117225723-59790c798e2c h1:6WjE2yInE+5jnI7cmCcxOiGZiEs2FQm9Zsg2a9Ivp0Q= @@ -2181,8 +2181,8 @@ github.com/streamingfast/shutter v1.5.0 h1:NpzDYzj0HVpSiDJVO/FFSL6QIK/YKOxY0gJAt github.com/streamingfast/shutter v1.5.0/go.mod h1:B/T6efqdeMGbGwjzPS1ToXzYZI4kDzI5/u4I+7qbjY8= github.com/streamingfast/snapshotter v0.0.0-20230316190750-5bcadfde44d0 h1:Y15G1Z4fpEdm2b+/70owI7TLuXadlqBtGM7rk4Hxrzk= github.com/streamingfast/snapshotter v0.0.0-20230316190750-5bcadfde44d0/go.mod h1:/Rnz2TJvaShjUct0scZ9kKV2Jr9/+KBAoWy4UMYxgv4= -github.com/streamingfast/substreams v1.11.4-0.20250116013259-ebf5362125de h1:4MH2yq5LqUZJZTR217lD+bZln5+q6FDkD70zwO3nbks= -github.com/streamingfast/substreams v1.11.4-0.20250116013259-ebf5362125de/go.mod h1:Dgbt37alWqMyahFQ4rdhX8iFLZHn2qD8TBhcP3NIuW8= +github.com/streamingfast/substreams v1.11.4-0.20250116174758-7b0afb88692e h1:9pk6d5QKvVLMl5TXSXKb8b0VMmEVh6e3kca200yIuk8= +github.com/streamingfast/substreams v1.11.4-0.20250116174758-7b0afb88692e/go.mod h1:Dgbt37alWqMyahFQ4rdhX8iFLZHn2qD8TBhcP3NIuW8= github.com/streamingfast/wazero v0.0.0-20241202185309-91287c3640ed h1:LU6/c376zP1cMAo9L6rFLyjo0W7RU+hIh7BegH8Zo5M= github.com/streamingfast/wazero v0.0.0-20241202185309-91287c3640ed/go.mod h1:yAI0XTsMBhREkM/YDAK/zNou3GoiAce1P6+rp/wQhjs= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/storage.go b/storage.go index f12b196..a6eb6f8 100644 --- a/storage.go +++ b/storage.go @@ -17,11 +17,11 @@ var indexStoreCreated bool var tmpDirCreated bool func GetTmpDir(dataDir string) (tmpDir string, err error) { + tmpDir = MustReplaceDataDir(dataDir, viperExpandedEnvGetString("common-tmp-dir")) if tmpDirCreated { return } - tmpDir = MustReplaceDataDir(dataDir, viperExpandedEnvGetString("common-tmp-dir")) err = os.MkdirAll(tmpDir, 0755) tmpDirCreated = true return diff --git a/storage_test.go b/storage_test.go index 246e070..2e84b41 100644 --- a/storage_test.go +++ b/storage_test.go @@ -1,8 +1,10 @@ package firecore import ( + "path/filepath" "testing" + "github.com/spf13/viper" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -47,3 +49,24 @@ func Test_searchBlockNum(t *testing.T) { func uptr(v uint64) *uint64 { return &v } + +func TestGetTmpDir(t *testing.T) { + dataDir := t.TempDir() + testSetViper(t, "common-tmp-dir", "{data-dir}/value") + + dir, err := GetTmpDir(dataDir) + assert.NoError(t, err) + assert.Equal(t, filepath.Join(dataDir, "value"), dir) + + dir2, err2 := GetTmpDir(dataDir) + assert.NoError(t, err2) + assert.Equal(t, filepath.Join(dataDir, "value"), dir2) +} + +func testSetViper(t *testing.T, key string, value string) { + current := viper.Get(key) + t.Cleanup(func() { + viper.Set(key, current) + }) + viper.Set(key, value) +} diff --git a/utils.go b/utils.go index 458e66e..a481dd6 100644 --- a/utils.go +++ b/utils.go @@ -50,7 +50,9 @@ func MakeDirs(directories []string) error { } // MustReplaceDataDir replaces `{data-dir}` from within the `in` received argument by the -// `dataDir` argument +// `dataDir` argument. +// +// MustReplaceDataDir("/tmp/data", "{data-dir}/subdir") == "/tmp/data/subdir" func MustReplaceDataDir(dataDir, in string) string { d, err := filepath.Abs(dataDir) if err != nil {