diff --git a/blockpoller/poller_client_test.go b/blockpoller/poller_client_test.go index 8b68727..dae4a91 100644 --- a/blockpoller/poller_client_test.go +++ b/blockpoller/poller_client_test.go @@ -9,6 +9,7 @@ import ( pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" "github.com/streamingfast/firehose-core/rpc" "github.com/stretchr/testify/require" + "go.uber.org/zap" ) type TestBlockItem struct { @@ -98,7 +99,7 @@ func (t TestBlockFetcherWithClient) Fetch(ctx context.Context, client *TestBlock } func TestPollerClient(t *testing.T) { - clients := rpc.NewClients[*TestBlockClient](1*time.Second, rpc.NewRollingStrategyAlwaysUseFirst[*TestBlockClient]()) + clients := rpc.NewClients[*TestBlockClient](1*time.Second, rpc.NewRollingStrategyAlwaysUseFirst[*TestBlockClient](), zap.NewNop()) var blockItems1 []*TestBlockItem var blockItems2 []*TestBlockItem diff --git a/blockpoller/poller_test.go b/blockpoller/poller_test.go index 114f5e2..9df2d50 100644 --- a/blockpoller/poller_test.go +++ b/blockpoller/poller_test.go @@ -13,6 +13,7 @@ import ( "github.com/streamingfast/firehose-core/rpc" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/zap" ) func TestForkHandler_run(t *testing.T) { @@ -167,7 +168,7 @@ func TestForkHandler_run(t *testing.T) { blockFetcher := newTestBlockFetcher[any](t, tt.blocks) blockFinalizer := newTestBlockFinalizer(t, tt.expectFireBlock) - clients := rpc.NewClients[any](1*time.Second, rpc.NewRollingStrategyAlwaysUseFirst[any]()) + clients := rpc.NewClients[any](1*time.Second, rpc.NewRollingStrategyAlwaysUseFirst[any](), zap.NewNop()) clients.Add(new(any)) poller := New(blockFetcher, blockFinalizer, clients) diff --git a/blockpoller/state_file_test.go b/blockpoller/state_file_test.go index 919d0cb..f282aca 100644 --- a/blockpoller/state_file_test.go +++ b/blockpoller/state_file_test.go @@ -76,7 +76,7 @@ func TestFireBlockFinalizer_noSstate(t *testing.T) { defer os.Remove(dirName) blockFetcher := newTestBlockFetcher[any](t, []*TestBlock{tb("60a", "59a", 60)}) - clients := rpc.NewClients[any](1*time.Second, rpc.NewRollingStrategyAlwaysUseFirst[any]()) + clients := rpc.NewClients[any](1*time.Second, rpc.NewRollingStrategyAlwaysUseFirst[any](), zap.NewNop()) clients.Add(new(any)) poller := &BlockPoller[any]{ diff --git a/rpc/rolling_strategy.go b/rpc/rolling_strategy.go index 7b8c5a5..a91512e 100644 --- a/rpc/rolling_strategy.go +++ b/rpc/rolling_strategy.go @@ -21,8 +21,6 @@ func (s *StickyRollingStrategy[C]) reset() { s.usedClientCount = 0 } func (s *StickyRollingStrategy[C]) next(clients *Clients[C]) (client C, err error) { - clients.lock.Lock() - defer clients.lock.Unlock() if len(clients.clients) == s.usedClientCount { return client, ErrorNoMoreClient @@ -61,9 +59,6 @@ func (s *StickyRollingStrategy[C]) next(clients *Clients[C]) (client C, err erro } func (s *StickyRollingStrategy[C]) prevIndex(clients *Clients[C]) int { - clients.lock.Lock() - defer clients.lock.Unlock() - if s.nextClientIndex == 0 { return len(clients.clients) - 1 } @@ -83,8 +78,6 @@ func (s *RollingStrategyAlwaysUseFirst[C]) reset() { } func (s *RollingStrategyAlwaysUseFirst[C]) next(c *Clients[C]) (client C, err error) { - c.lock.Lock() - defer c.lock.Unlock() if len(c.clients) <= s.nextIndex { return client, ErrorNoMoreClient diff --git a/rpc/sort_test.go b/rpc/sort_test.go index 4e95891..cb7ddb4 100644 --- a/rpc/sort_test.go +++ b/rpc/sort_test.go @@ -13,7 +13,7 @@ import ( type testSortFetcher struct { } -func (t *testSortFetcher) fetchSortValue(ctx context.Context, client *rollClient) (sortValue uint64, err error) { +func (t *testSortFetcher) FetchSortValue(ctx context.Context, client *rollClient) (sortValue uint64, err error) { return client.sortValue, nil }