From ba63211e03877669c6af95153ac302336fcad9c0 Mon Sep 17 00:00:00 2001 From: Goran Rojovic Date: Wed, 15 Jan 2025 12:57:01 +0100 Subject: [PATCH] fix: bug fix and UTs --- reorgdetector/reorgdetector.go | 9 +-- reorgdetector/reorgdetector_db.go | 2 +- reorgdetector/reorgdetector_test.go | 99 +++++++++++++++++++++++++++++ 3 files changed, 105 insertions(+), 5 deletions(-) diff --git a/reorgdetector/reorgdetector.go b/reorgdetector/reorgdetector.go index a5089279c..b8876fc48 100644 --- a/reorgdetector/reorgdetector.go +++ b/reorgdetector/reorgdetector.go @@ -159,10 +159,11 @@ func (rd *ReorgDetector) detectReorgInTrackedList(ctx context.Context) error { // and hashes matches. If higher than finalized block, we assume a reorg still might happen. if hdr.Num <= lastFinalisedBlock.Number.Uint64() { hdrs.removeRange(hdr.Num, hdr.Num) - } - if err := rd.removeTrackedBlockRange(id, hdr.Num, hdr.Num); err != nil { - return fmt.Errorf("error removing blocks from DB for subscriber %s between blocks %d and %d: %w", - id, hdr.Num, hdr.Num, err) + + if err := rd.removeTrackedBlockRange(id, hdr.Num, hdr.Num); err != nil { + return fmt.Errorf("error removing blocks from DB for subscriber %s between blocks %d and %d: %w", + id, hdr.Num, hdr.Num, err) + } } continue diff --git a/reorgdetector/reorgdetector_db.go b/reorgdetector/reorgdetector_db.go index 3a066b7f2..a0abc53e5 100644 --- a/reorgdetector/reorgdetector_db.go +++ b/reorgdetector/reorgdetector_db.go @@ -70,7 +70,7 @@ func (rd *ReorgDetector) saveTrackedBlock(id string, b header) error { // updateTrackedBlocksDB updates the tracked blocks for a subscriber in db func (rd *ReorgDetector) removeTrackedBlockRange(id string, fromBlock, toBlock uint64) error { _, err := rd.db.Exec( - "DELETE FROM tracked_block WHERE num >= $1 AND NUM <= 2 AND subscriber_id = $3;", + "DELETE FROM tracked_block WHERE num >= $1 AND NUM <= $2 AND subscriber_id = $3;", fromBlock, toBlock, id, ) return err diff --git a/reorgdetector/reorgdetector_test.go b/reorgdetector/reorgdetector_test.go index 2a69aba2a..ef5799327 100644 --- a/reorgdetector/reorgdetector_test.go +++ b/reorgdetector/reorgdetector_test.go @@ -2,14 +2,18 @@ package reorgdetector import ( "context" + big "math/big" "path" "strings" + "sync" "testing" "time" cdktypes "github.com/0xPolygon/cdk/config/types" common "github.com/ethereum/go-ethereum/common" + types "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient/simulated" + "github.com/ethereum/go-ethereum/rpc" "github.com/stretchr/testify/require" ) @@ -158,3 +162,98 @@ func TestNotSubscribed(t *testing.T) { err = reorgDetector.AddBlockToTrack(context.Background(), "foo", 1, common.Hash{}) require.True(t, strings.Contains(err.Error(), "is not subscribed")) } + +func TestDetectReorgs_BlockNotFinalized(t *testing.T) { + t.Parallel() + + ctx := context.Background() + syncerID := "test-syncer" + trackedBlock := &types.Header{Number: big.NewInt(9)} + + t.Run("Block not finalized", func(t *testing.T) { + t.Parallel() + + lastFinalizedBlock := &types.Header{Number: big.NewInt(8)} + client := NewEthClientMock(t) + client.On("HeaderByNumber", ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))).Return(lastFinalizedBlock, nil) + client.On("HeaderByNumber", ctx, trackedBlock.Number).Return(trackedBlock, nil) + + testDir := path.Join(t.TempDir(), "reorgdetectorTestDetectReorgs.sqlite") + reorgDetector, err := New(client, Config{DBPath: testDir, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 100)}) + require.NoError(t, err) + + _, err = reorgDetector.Subscribe(syncerID) + require.NoError(t, err) + require.NoError(t, reorgDetector.AddBlockToTrack(ctx, syncerID, trackedBlock.Number.Uint64(), trackedBlock.Hash())) + + require.NoError(t, reorgDetector.detectReorgInTrackedList(ctx)) + + trackedBlocks, err := reorgDetector.getTrackedBlocks() + require.NoError(t, err) + require.Equal(t, 1, len(trackedBlocks)) + + syncerTrackedBlocks, ok := trackedBlocks[syncerID] + require.True(t, ok) + require.Equal(t, 1, syncerTrackedBlocks.len()) + }) + + t.Run("Block finalized", func(t *testing.T) { + t.Parallel() + + lastFinalizedBlock := trackedBlock + client := NewEthClientMock(t) + client.On("HeaderByNumber", ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))).Return(lastFinalizedBlock, nil) + + testDir := path.Join(t.TempDir(), "reorgdetectorTestDetectReorgs.sqlite") + reorgDetector, err := New(client, Config{DBPath: testDir, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 100)}) + require.NoError(t, err) + + _, err = reorgDetector.Subscribe(syncerID) + require.NoError(t, err) + require.NoError(t, reorgDetector.AddBlockToTrack(ctx, syncerID, trackedBlock.Number.Uint64(), trackedBlock.Hash())) + + require.NoError(t, reorgDetector.detectReorgInTrackedList(ctx)) + + trackedBlocks, err := reorgDetector.getTrackedBlocks() + require.NoError(t, err) + require.Equal(t, 0, len(trackedBlocks)) + }) + + t.Run("Reorg happened", func(t *testing.T) { + t.Parallel() + + lastFinalizedBlock := &types.Header{Number: big.NewInt(5)} + reorgedTrackedBlock := &types.Header{Number: trackedBlock.Number, Extra: []byte("reorged")} // Different hash + + client := NewEthClientMock(t) + client.On("HeaderByNumber", ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))).Return(lastFinalizedBlock, nil) + client.On("HeaderByNumber", ctx, trackedBlock.Number).Return(reorgedTrackedBlock, nil) + + testDir := path.Join(t.TempDir(), "reorgdetectorTestDetectReorgs.sqlite") + reorgDetector, err := New(client, Config{DBPath: testDir, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 100)}) + require.NoError(t, err) + + subscription, err := reorgDetector.Subscribe(syncerID) + require.NoError(t, err) + + var wg sync.WaitGroup + + wg.Add(1) + go func() { + <-subscription.ReorgedBlock + subscription.ReorgProcessed <- true + + wg.Done() + }() + + require.NoError(t, reorgDetector.AddBlockToTrack(ctx, syncerID, trackedBlock.Number.Uint64(), trackedBlock.Hash())) + + require.NoError(t, reorgDetector.detectReorgInTrackedList(ctx)) + + wg.Wait() // we wait here to make sure the reorg is processed + + trackedBlocks, err := reorgDetector.getTrackedBlocks() + require.NoError(t, err) + require.Equal(t, 0, len(trackedBlocks)) // shouldn't be any since a reorg happened on that block + }) +}