diff --git a/reorgdetector/reorgdetector.go b/reorgdetector/reorgdetector.go index f8ec9c82..8f35e82e 100644 --- a/reorgdetector/reorgdetector.go +++ b/reorgdetector/reorgdetector.go @@ -134,7 +134,7 @@ func (bm blockMap) removeRange(from, to uint64) { type Subscription struct { FirstReorgedBlock chan uint64 ReorgProcessed chan bool - pendingReorgsToBeProcessed *sync.WaitGroup + pendingReorgsToBeProcessed sync.WaitGroup } type ReorgDetector struct { @@ -266,7 +266,7 @@ func (r *ReorgDetector) cleanStoredSubsBeforeStart(ctx context.Context, latestFi sortedBlocks := blocks.getSorted() lastTrackedBlock = sortedBlocks[len(blocks)-1].Num - for _, block = range blocks { + for _, block = range sortedBlocks { if actualBlockHash, ok = blocksGotten[block.Num]; !ok { actualBlock, err := r.ethClient.HeaderByNumber(ctx, big.NewInt(int64(block.Num))) if err != nil { diff --git a/reorgdetector/reorgdetector_test.go b/reorgdetector/reorgdetector_test.go index 77f546e3..05c6ac23 100644 --- a/reorgdetector/reorgdetector_test.go +++ b/reorgdetector/reorgdetector_test.go @@ -84,6 +84,7 @@ func TestBlockMap(t *testing.T) { t.Errorf("getFromBlockSorted() returned incorrect result, expected: %v, got: %v", expectedFromBlockSorted, fromBlockSorted) } } + func TestReorgDetector_New(t *testing.T) { t.Parallel() @@ -192,6 +193,50 @@ func TestReorgDetector_New(t *testing.T) { require.True(t, exists) require.Len(t, testSubscriberMap, 0) // since all blocks are finalized }) + + t.Run("have tracked blocks and subscriptions - reorg happened", func(t *testing.T) { + t.Parallel() + + client := NewEthClientMock(t) + db := newTestDB(t) + + trackedBlocks := createTestBlocks(t, 1, 5) + testSubscriberBlocks := trackedBlocks[:5] + + insertTestData(t, ctx, db, nil, unfalisedBlocksID) // no unfinalised blocks + insertTestData(t, ctx, db, testSubscriberBlocks, testSubscriber) + + for _, block := range trackedBlocks[:3] { + client.On("HeaderByNumber", ctx, block.Number).Return( + block, nil, + ) + } + + reorgedBlocks := createTestBlocks(t, 4, 2) // block 4, and 5 are reorged + reorgedBlocks[0].ParentHash = trackedBlocks[2].Hash() // block 4 is reorged but his parent is block 3 + reorgedBlocks[1].ParentHash = reorgedBlocks[0].Hash() // block 5 is reorged but his parent is block 4 + + client.On("HeaderByNumber", ctx, reorgedBlocks[0].Number).Return( + reorgedBlocks[0], nil, + ) + + client.On("HeaderByNumber", ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))).Return( + reorgedBlocks[len(reorgedBlocks)-1], nil, + ) + + rd, err := newReorgDetector(ctx, client, db) + require.NoError(t, err) + require.Len(t, rd.trackedBlocks, 2) // testSubscriber and unfinalisedBlocks + + // we wait for the subscriber to be notified about the reorg + firstReorgedBlock := <-rd.subscriptions[testSubscriber].FirstReorgedBlock + require.Equal(t, reorgedBlocks[0].Number.Uint64(), firstReorgedBlock) + + // all blocks should be cleaned from the tracked blocks + // since subscriber had 5 blocks, 3 were finalized, and 2 were reorged but also finalized + subscriberBlocks := rd.trackedBlocks[testSubscriber] + require.Len(t, subscriberBlocks, 0) + }) } func createTestBlocks(t *testing.T, startBlock uint64, count uint64) []*types.Header {