From 76469b52cd66c62763cb93ced35b70914bdd41a3 Mon Sep 17 00:00:00 2001 From: Charles Billette Date: Tue, 26 Nov 2024 09:14:55 -0500 Subject: [PATCH] Add sorting functionality and new rolling strategies Introduced a sorting mechanism for clients in the `rpc` package. Added interfaces and implementations for sticky rolling strategy, along with comprehensive test cases. Improved thread safety and replaced the old rolling strategy implementations. --- blockpoller/fetcher.go | 4 ++ blockpoller/poller.go | 2 - rpc/client_test.go | 50 ---------------- rpc/clients.go | 110 ++++++++--------------------------- rpc/rolling_strategy.go | 96 ++++++++++++++++++++++++++++++ rpc/rolling_strategy_test.go | 56 ++++++++++++++++++ rpc/sort.go | 54 +++++++++++++++++ rpc/sort_test.go | 42 +++++++++++++ 8 files changed, 275 insertions(+), 139 deletions(-) create mode 100644 rpc/rolling_strategy.go create mode 100644 rpc/rolling_strategy_test.go create mode 100644 rpc/sort.go create mode 100644 rpc/sort_test.go diff --git a/blockpoller/fetcher.go b/blockpoller/fetcher.go index 269508a..c037e4c 100644 --- a/blockpoller/fetcher.go +++ b/blockpoller/fetcher.go @@ -10,3 +10,7 @@ type BlockFetcher[C any] interface { IsBlockAvailable(requestedSlot uint64) bool Fetch(ctx context.Context, client C, blkNum uint64) (b *pbbstream.Block, skipped bool, err error) } + +type HeadBlockNumberFetcher[C any] interface { + FetchHeadBlockNumber(ctx context.Context, client C) (uint64, error) +} diff --git a/blockpoller/poller.go b/blockpoller/poller.go index 040e267..3daaed8 100644 --- a/blockpoller/poller.go +++ b/blockpoller/poller.go @@ -73,8 +73,6 @@ func New[C any]( return b } -var MaxStopBlock *uint64 = nil - func (p *BlockPoller[C]) Run(firstStreamableBlockNum uint64, stopBlock *uint64, blockFetchBatchSize int) error { p.startBlockNumGate = firstStreamableBlockNum p.logger.Info("starting poller", diff --git a/rpc/client_test.go b/rpc/client_test.go index 4900a04..9ab1e3e 100644 --- a/rpc/client_test.go +++ b/rpc/client_test.go @@ -1,51 +1 @@ package rpc - -import ( - "context" - "fmt" - "testing" - "time" - - "github.com/stretchr/testify/require" -) - -type rollClient struct { - callCount int - name string -} - -func TestRollingStrategy(t *testing.T) { - - rollingStrategy := NewRollingStrategyRoundRobin[*rollClient]() - rollingStrategy.reset() - - clients := NewClients(2*time.Second, rollingStrategy) - clients.Add(&rollClient{name: "c.1"}) - clients.Add(&rollClient{name: "c.2"}) - clients.Add(&rollClient{name: "c.3"}) - clients.Add(&rollClient{name: "c.a"}) - clients.Add(&rollClient{name: "c.b"}) - - var clientNames []string - _, err := WithClients(clients, func(ctx context.Context, client *rollClient) (v any, err error) { - clientNames = append(clientNames, client.name) - if client.name == "c.3" { - return nil, nil - } - - return nil, fmt.Errorf("next please") - }) - - require.NoError(t, err) - //require.ErrorIs(t, err, ErrorNoMoreClient) - require.Equal(t, []string{"c.1", "c.2", "c.3"}, clientNames) - - _, err = WithClients(clients, func(ctx context.Context, client *rollClient) (v any, err error) { - clientNames = append(clientNames, client.name) - return nil, fmt.Errorf("next please") - }) - - require.ErrorIs(t, err, ErrorNoMoreClient) - require.Equal(t, []string{"c.1", "c.2", "c.3", "c.3", "c.a", "c.b", "c.1", "c.2"}, clientNames) - -} diff --git a/rpc/clients.go b/rpc/clients.go index 66e6986..b326335 100644 --- a/rpc/clients.go +++ b/rpc/clients.go @@ -3,9 +3,11 @@ package rpc import ( "context" "errors" + "sync" "time" "github.com/hashicorp/go-multierror" + "go.uber.org/zap" ) var ErrorNoMoreClient = errors.New("no more clients") @@ -14,20 +16,40 @@ type Clients[C any] struct { clients []C maxBlockFetchDuration time.Duration rollingStrategy RollingStrategy[C] + lock sync.Mutex + logger *zap.Logger } -func NewClients[C any](maxBlockFetchDuration time.Duration, rollingStrategy RollingStrategy[C]) *Clients[C] { +func NewClients[C any](maxBlockFetchDuration time.Duration, rollingStrategy RollingStrategy[C], logger *zap.Logger) *Clients[C] { return &Clients[C]{ maxBlockFetchDuration: maxBlockFetchDuration, rollingStrategy: rollingStrategy, + logger: logger, } } +func (c *Clients[C]) StartSorting(ctx context.Context, direction SortDirection, every time.Duration) { + go func() { + for { + c.logger.Info("sorting clients") + err := Sort(ctx, c, direction) + if err != nil { + c.logger.Warn("sorting", zap.Error(err)) + } + time.Sleep(every) + } + }() +} + func (c *Clients[C]) Add(client C) { + c.lock.Lock() + defer c.lock.Unlock() c.clients = append(c.clients, client) } func WithClients[C any, V any](clients *Clients[C], f func(context.Context, C) (v V, err error)) (v V, err error) { + clients.lock.Lock() + defer clients.lock.Unlock() var errs error clients.rollingStrategy.reset() @@ -58,89 +80,3 @@ func WithClients[C any, V any](clients *Clients[C], f func(context.Context, C) ( return v, nil } } - -type RollingStrategy[C any] interface { - reset() - next(clients *Clients[C]) (C, error) -} - -type RollingStrategyRoundRobin[C any] struct { - fistCallToNewClient bool - usedClientCount int - nextClientIndex int -} - -func NewRollingStrategyRoundRobin[C any]() RollingStrategy[C] { - return &RollingStrategyRoundRobin[C]{ - fistCallToNewClient: true, - } -} - -func (s *RollingStrategyRoundRobin[C]) reset() { - s.usedClientCount = 0 -} -func (s *RollingStrategyRoundRobin[C]) next(clients *Clients[C]) (client C, err error) { - if len(clients.clients) == s.usedClientCount { - return client, ErrorNoMoreClient - } - - if s.fistCallToNewClient { - s.fistCallToNewClient = false - client = clients.clients[0] - s.usedClientCount = s.usedClientCount + 1 - s.nextClientIndex = s.nextClientIndex + 1 - return client, nil - } - - if s.nextClientIndex == len(clients.clients) { //roll to 1st client - s.nextClientIndex = 0 - } - - if s.usedClientCount == 0 { //just been reset - s.nextClientIndex = s.prevIndex(clients) - client = clients.clients[s.nextClientIndex] - s.usedClientCount = s.usedClientCount + 1 - s.nextClientIndex = s.nextClientIndex + 1 - return client, nil - } - - if s.nextClientIndex == len(clients.clients) { //roll to 1st client - client = clients.clients[0] - s.usedClientCount = s.usedClientCount + 1 - return client, nil - } - - client = clients.clients[s.nextClientIndex] - s.usedClientCount = s.usedClientCount + 1 - s.nextClientIndex = s.nextClientIndex + 1 - return client, nil -} - -func (s *RollingStrategyRoundRobin[C]) prevIndex(clients *Clients[C]) int { - if s.nextClientIndex == 0 { - return len(clients.clients) - 1 - } - return s.nextClientIndex - 1 -} - -type RollingStrategyAlwaysUseFirst[C any] struct { - nextIndex int -} - -func NewRollingStrategyAlwaysUseFirst[C any]() *RollingStrategyAlwaysUseFirst[C] { - return &RollingStrategyAlwaysUseFirst[C]{} -} - -func (s *RollingStrategyAlwaysUseFirst[C]) reset() { - s.nextIndex = 0 -} - -func (s *RollingStrategyAlwaysUseFirst[C]) next(c *Clients[C]) (client C, err error) { - if len(c.clients) <= s.nextIndex { - return client, ErrorNoMoreClient - } - client = c.clients[s.nextIndex] - s.nextIndex++ - return client, nil - -} diff --git a/rpc/rolling_strategy.go b/rpc/rolling_strategy.go new file mode 100644 index 0000000..7b8c5a5 --- /dev/null +++ b/rpc/rolling_strategy.go @@ -0,0 +1,96 @@ +package rpc + +type RollingStrategy[C any] interface { + reset() + next(clients *Clients[C]) (C, error) +} + +type StickyRollingStrategy[C any] struct { + fistCallToNewClient bool + usedClientCount int + nextClientIndex int +} + +func NewStickyRollingStrategy[C any]() *StickyRollingStrategy[C] { + return &StickyRollingStrategy[C]{ + fistCallToNewClient: true, + } +} + +func (s *StickyRollingStrategy[C]) reset() { + s.usedClientCount = 0 +} +func (s *StickyRollingStrategy[C]) next(clients *Clients[C]) (client C, err error) { + clients.lock.Lock() + defer clients.lock.Unlock() + + if len(clients.clients) == s.usedClientCount { + return client, ErrorNoMoreClient + } + + if s.fistCallToNewClient { + s.fistCallToNewClient = false + client = clients.clients[0] + s.usedClientCount = s.usedClientCount + 1 + s.nextClientIndex = s.nextClientIndex + 1 + return client, nil + } + + if s.nextClientIndex == len(clients.clients) { //roll to 1st client + s.nextClientIndex = 0 + } + + if s.usedClientCount == 0 { //just been reset + s.nextClientIndex = s.prevIndex(clients) + client = clients.clients[s.nextClientIndex] + s.usedClientCount = s.usedClientCount + 1 + s.nextClientIndex = s.nextClientIndex + 1 + return client, nil + } + + if s.nextClientIndex == len(clients.clients) { //roll to 1st client + client = clients.clients[0] + s.usedClientCount = s.usedClientCount + 1 + return client, nil + } + + client = clients.clients[s.nextClientIndex] + s.usedClientCount = s.usedClientCount + 1 + s.nextClientIndex = s.nextClientIndex + 1 + return client, nil +} + +func (s *StickyRollingStrategy[C]) prevIndex(clients *Clients[C]) int { + clients.lock.Lock() + defer clients.lock.Unlock() + + if s.nextClientIndex == 0 { + return len(clients.clients) - 1 + } + return s.nextClientIndex - 1 +} + +type RollingStrategyAlwaysUseFirst[C any] struct { + nextIndex int +} + +func NewRollingStrategyAlwaysUseFirst[C any]() *RollingStrategyAlwaysUseFirst[C] { + return &RollingStrategyAlwaysUseFirst[C]{} +} + +func (s *RollingStrategyAlwaysUseFirst[C]) reset() { + s.nextIndex = 0 +} + +func (s *RollingStrategyAlwaysUseFirst[C]) next(c *Clients[C]) (client C, err error) { + c.lock.Lock() + defer c.lock.Unlock() + + if len(c.clients) <= s.nextIndex { + return client, ErrorNoMoreClient + } + client = c.clients[s.nextIndex] + s.nextIndex++ + return client, nil + +} diff --git a/rpc/rolling_strategy_test.go b/rpc/rolling_strategy_test.go new file mode 100644 index 0000000..d63c344 --- /dev/null +++ b/rpc/rolling_strategy_test.go @@ -0,0 +1,56 @@ +package rpc + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +type rollClient struct { + callCount int + name string + sortValue uint64 +} + +func (r *rollClient) fetchSortValue(ctx context.Context) (sortValue uint64, err error) { + return r.sortValue, nil +} + +func TestStickyRollingStrategy(t *testing.T) { + + rollingStrategy := NewStickyRollingStrategy[*rollClient]() + rollingStrategy.reset() + + clients := NewClients(2*time.Second, rollingStrategy) + clients.Add(&rollClient{name: "c.1"}) + clients.Add(&rollClient{name: "c.2"}) + clients.Add(&rollClient{name: "c.3"}) + clients.Add(&rollClient{name: "c.a"}) + clients.Add(&rollClient{name: "c.b"}) + + var clientNames []string + _, err := WithClients(clients, func(ctx context.Context, client *rollClient) (v any, err error) { + clientNames = append(clientNames, client.name) + if client.name == "c.3" { + return nil, nil + } + + return nil, fmt.Errorf("next please") + }) + + require.NoError(t, err) + //require.ErrorIs(t, err, ErrorNoMoreClient) + require.Equal(t, []string{"c.1", "c.2", "c.3"}, clientNames) + + _, err = WithClients(clients, func(ctx context.Context, client *rollClient) (v any, err error) { + clientNames = append(clientNames, client.name) + return nil, fmt.Errorf("next please") + }) + + require.ErrorIs(t, err, ErrorNoMoreClient) + require.Equal(t, []string{"c.1", "c.2", "c.3", "c.3", "c.a", "c.b", "c.1", "c.2"}, clientNames) + +} diff --git a/rpc/sort.go b/rpc/sort.go new file mode 100644 index 0000000..9f7902c --- /dev/null +++ b/rpc/sort.go @@ -0,0 +1,54 @@ +package rpc + +import ( + "context" + "sort" +) + +type SortValueFetcher interface { + fetchSortValue(ctx context.Context) (sortValue uint64, err error) +} + +type SortDirection int + +const ( + SortDirectionAscending SortDirection = iota + SortDirectionDescending +) + +func Sort[C any](ctx context.Context, clients *Clients[C], direction SortDirection) error { + type sortable struct { + clientIndex int + sortValue uint64 + } + var sortableValues []sortable + for i, client := range clients.clients { + var v uint64 + var err error + if s, ok := any(client).(SortValueFetcher); ok { + v, err = s.fetchSortValue(ctx) + if err != nil { + //do nothing + } + } + sortableValues = append(sortableValues, sortable{i, v}) + } + + sort.Slice(sortableValues, func(i, j int) bool { + if direction == SortDirectionAscending { + return sortableValues[i].sortValue < sortableValues[j].sortValue + } + return sortableValues[i].sortValue > sortableValues[j].sortValue + }) + + var sorted []C + for _, v := range sortableValues { + sorted = append(sorted, clients.clients[v.clientIndex]) + } + + clients.lock.Lock() + defer clients.lock.Unlock() + clients.clients = sorted + + return nil +} diff --git a/rpc/sort_test.go b/rpc/sort_test.go new file mode 100644 index 0000000..eb032e8 --- /dev/null +++ b/rpc/sort_test.go @@ -0,0 +1,42 @@ +package rpc + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestClientsSort(t *testing.T) { + rollingStrategy := NewStickyRollingStrategy[*rollClient]() + rollingStrategy.reset() + + clients := NewClients(2*time.Second, rollingStrategy) + clients.Add(&rollClient{name: "c.1", sortValue: 100}) + clients.Add(&rollClient{name: "c.2", sortValue: 101}) + clients.Add(&rollClient{name: "c.3", sortValue: 102}) + clients.Add(&rollClient{name: "c.a", sortValue: 103}) + clients.Add(&rollClient{name: "c.b", sortValue: 104}) + + err := Sort(context.Background(), clients, SortDirectionDescending) + require.NoError(t, err) + + var names []string + for _, client := range clients.clients { + names = append(names, client.name) + } + + require.Equal(t, []string{"c.b", "c.a", "c.3", "c.2", "c.1"}, names) + + err = Sort(context.Background(), clients, SortDirectionAscending) + require.NoError(t, err) + + names = []string{} + for _, client := range clients.clients { + names = append(names, client.name) + } + + require.Equal(t, []string{"c.1", "c.2", "c.3", "c.a", "c.b"}, names) + +}