From 720694b7de7622b49a72f14e87cc19c84a95a77e Mon Sep 17 00:00:00 2001 From: joanestebanr <129153821+joanestebanr@users.noreply.github.com> Date: Wed, 29 Jan 2025 11:22:59 +0100 Subject: [PATCH 1/4] fix: sync evmdownloader --- sync/evmdownloader.go | 79 ++++++--- sync/evmdownloader_test.go | 353 ++++++++++++++++++------------------- 2 files changed, 228 insertions(+), 204 deletions(-) diff --git a/sync/evmdownloader.go b/sync/evmdownloader.go index 1de2be73..9d5aeb3f 100644 --- a/sync/evmdownloader.go +++ b/sync/evmdownloader.go @@ -39,8 +39,9 @@ type LogAppenderMap map[common.Hash]func(b *EVMBlock, l types.Log) error type EVMDownloader struct { syncBlockChunkSize uint64 EVMDownloaderInterface - log *log.Logger - finalizedBlockType etherman.BlockNumberFinality + log *log.Logger + finalizedBlockType etherman.BlockNumberFinality + stopDownloaderOnIterationN int } func NewEVMDownloader( @@ -101,9 +102,16 @@ func NewEVMDownloader( }, nil } +// setStopDownloaderOnIterationN sets the block number to stop the downloader (just for unittest) +func (d *EVMDownloader) setStopDownloaderOnIterationN(iteration int) { + d.stopDownloaderOnIterationN = iteration +} + func (d *EVMDownloader) Download(ctx context.Context, fromBlock uint64, downloadedCh chan EVMBlock) { lastBlock := d.WaitForNewBlocks(ctx, 0) - + toBlock := fromBlock + d.syncBlockChunkSize + iteration := 0 + reachTop := false for { select { case <-ctx.Done(): @@ -112,53 +120,70 @@ func (d *EVMDownloader) Download(ctx context.Context, fromBlock uint64, download return default: } + d.log.Debugf("range: %d to %d, last block: %d", fromBlock, toBlock, lastBlock) - toBlock := fromBlock + d.syncBlockChunkSize - if toBlock > lastBlock { - toBlock = lastBlock - } - - if fromBlock > toBlock { - d.log.Infof( - "waiting for new blocks, last block processed: %d, last block seen on L1: %d", - fromBlock-1, lastBlock, + if reachTop && toBlock >= lastBlock { + d.log.Debugf( + "waiting for new blocks, current range: [%d to %d], last block seen: %d", + fromBlock, toBlock, lastBlock, ) - lastBlock = d.WaitForNewBlocks(ctx, fromBlock-1) - continue - } + lastBlock = d.WaitForNewBlocks(ctx, lastBlock) + d.log.Debugf("new last block seen: %d", lastBlock) + if fromBlock-toBlock < d.syncBlockChunkSize { + toBlock = fromBlock + d.syncBlockChunkSize + } + } + reachTop = false lastFinalizedBlock, err := d.GetLastFinalizedBlock(ctx) if err != nil { d.log.Error("error getting last finalized block: ", err) continue } - lastFinalizedBlockNumber := lastFinalizedBlock.Number.Uint64() - d.log.Infof("getting events from blocks %d to %d. lastFinalizedBlock: %d", - fromBlock, toBlock, lastFinalizedBlockNumber) - blocks := d.GetEventsByBlockRange(ctx, fromBlock, toBlock) - + requestToBlock := toBlock + if toBlock >= lastBlock { + requestToBlock = lastBlock + reachTop = true + } + d.log.Debugf("getting events from blocks [%d to %d] toBlock: %d. lastFinalizedBlock: %d lastBlock: %d", + fromBlock, requestToBlock, toBlock, lastFinalizedBlockNumber, lastBlock) + blocks := d.GetEventsByBlockRange(ctx, fromBlock, requestToBlock) + d.log.Debugf("result events from blocks [%d to %d] -> len(blocks)=%d", + fromBlock, requestToBlock, len(blocks)) if toBlock <= lastFinalizedBlockNumber { d.reportBlocks(downloadedCh, blocks, lastFinalizedBlockNumber) - fromBlock = toBlock + 1 - if blocks.Len() == 0 || blocks[blocks.Len()-1].Num < toBlock { d.reportEmptyBlock(ctx, downloadedCh, toBlock, lastFinalizedBlockNumber) } + fromBlock = toBlock + 1 + toBlock = fromBlock + d.syncBlockChunkSize } else { - d.reportBlocks(downloadedCh, blocks, lastFinalizedBlockNumber) - if blocks.Len() == 0 { if lastFinalizedBlockNumber > fromBlock && - lastFinalizedBlockNumber-fromBlock > d.syncBlockChunkSize { - d.reportEmptyBlock(ctx, downloadedCh, fromBlock+d.syncBlockChunkSize, lastFinalizedBlockNumber) - fromBlock += d.syncBlockChunkSize + 1 + toBlock-fromBlock > d.syncBlockChunkSize { + emptyBlock := lastFinalizedBlockNumber + d.reportEmptyBlock(ctx, downloadedCh, emptyBlock, lastFinalizedBlockNumber) + fromBlock = emptyBlock + 1 + toBlock = fromBlock + d.syncBlockChunkSize + } else { + + // Extend range until find logs or reach the last finalized block + toBlock += d.syncBlockChunkSize } } else { + d.reportBlocks(downloadedCh, blocks, lastFinalizedBlockNumber) fromBlock = blocks[blocks.Len()-1].Num + 1 + toBlock = fromBlock + d.syncBlockChunkSize } } + iteration++ + if d.stopDownloaderOnIterationN != 0 && iteration >= d.stopDownloaderOnIterationN { + d.log.Infof("stop downloader on iteration %d", iteration) + return + } + } } diff --git a/sync/evmdownloader_test.go b/sync/evmdownloader_test.go index b4c600f1..1b47a8bb 100644 --- a/sync/evmdownloader_test.go +++ b/sync/evmdownloader_test.go @@ -3,6 +3,7 @@ package sync import ( "context" "errors" + "fmt" "math/big" "strconv" "testing" @@ -197,183 +198,6 @@ func generateEvent(blockNum uint32) (*types.Log, testEvent) { return log, testEvent(h) } -func TestDownload(t *testing.T) { - /* - NOTE: due to the concurrent nature of this test (the function being tested runs through a goroutine) - if the mock doesn't match, the goroutine will get stuck and the test will timeout - */ - d := NewEVMDownloaderMock(t) - downloadCh := make(chan EVMBlock, 1) - ctx := context.Background() - ctx1, cancel := context.WithCancel(ctx) - expectedBlocks := EVMBlocks{} - dwnldr, _ := NewTestDownloader(t, time.Millisecond*100) - dwnldr.EVMDownloaderInterface = d - - d.On("WaitForNewBlocks", mock.Anything, uint64(0)). - Return(uint64(1)) - - lastFinalizedBlock := &types.Header{Number: big.NewInt(1)} - createEVMBlockFn := func(header *types.Header, isSafeBlock bool) *EVMBlock { - return &EVMBlock{ - IsFinalizedBlock: isSafeBlock, - EVMBlockHeader: EVMBlockHeader{ - Num: header.Number.Uint64(), - Hash: header.Hash(), - ParentHash: header.ParentHash, - Timestamp: header.Time, - }, - } - } - - // iteration 0: - // last block is 1, download that block (no events and wait) - b0 := createEVMBlockFn(lastFinalizedBlock, true) - expectedBlocks = append(expectedBlocks, b0) - d.On("GetLastFinalizedBlock", mock.Anything).Return(lastFinalizedBlock, nil).Once() - d.On("GetEventsByBlockRange", mock.Anything, uint64(0), uint64(1)). - Return(EVMBlocks{}, false).Once() - d.On("GetBlockHeader", mock.Anything, uint64(1)).Return(b0.EVMBlockHeader, false).Once() - - // iteration 1: we have a new block, so increase to block (no events) - lastFinalizedBlock = &types.Header{Number: big.NewInt(2)} - b2 := createEVMBlockFn(lastFinalizedBlock, true) - expectedBlocks = append(expectedBlocks, b2) - d.On("WaitForNewBlocks", mock.Anything, uint64(1)). - Return(uint64(2)) - d.On("GetLastFinalizedBlock", mock.Anything).Return(lastFinalizedBlock, nil).Once() - d.On("GetEventsByBlockRange", mock.Anything, uint64(2), uint64(2)). - Return(EVMBlocks{}, false).Once() - d.On("GetBlockHeader", mock.Anything, uint64(2)).Return(b2.EVMBlockHeader, false).Once() - - // iteration 2: wait for next block to be created (jump to block 8) - d.On("WaitForNewBlocks", mock.Anything, uint64(2)). - After(time.Millisecond * 100). - Return(uint64(8)).Once() - - // iteration 3: blocks 6 and 7 have events, last finalized block is 5 - lastFinalizedBlock = &types.Header{Number: big.NewInt(5)} - b6 := &EVMBlock{ - EVMBlockHeader: EVMBlockHeader{ - Num: 6, - Hash: common.HexToHash("06"), - }, - Events: []interface{}{"06"}, - } - b7 := &EVMBlock{ - EVMBlockHeader: EVMBlockHeader{ - Num: 7, - Hash: common.HexToHash("07"), - }, - Events: []interface{}{"07"}, - } - expectedBlocks = append(expectedBlocks, b6, b7) - d.On("GetLastFinalizedBlock", mock.Anything).Return(lastFinalizedBlock, nil).Once() - d.On("GetEventsByBlockRange", mock.Anything, uint64(3), uint64(8)). - Return(EVMBlocks{b6, b7}, false) - - // iteration 4: finalized block is now block 8, report the finalized block - lastFinalizedBlock = &types.Header{Number: big.NewInt(8)} - b8 := createEVMBlockFn(lastFinalizedBlock, true) - expectedBlocks = append(expectedBlocks, b8) - d.On("GetLastFinalizedBlock", mock.Anything).Return(lastFinalizedBlock, nil).Once() - d.On("GetEventsByBlockRange", mock.Anything, uint64(8), uint64(8)). - Return(EVMBlocks{}, false) - d.On("GetBlockHeader", mock.Anything, uint64(8)).Return(b8.EVMBlockHeader, false).Once() - - // iteration 5: from block 9 to 19, no events - lastFinalizedBlock = &types.Header{Number: big.NewInt(15)} - d.On("WaitForNewBlocks", mock.Anything, uint64(8)). - After(time.Millisecond * 100). - Return(uint64(19)).Once() - d.On("GetLastFinalizedBlock", mock.Anything).Return(lastFinalizedBlock, nil).Once() - d.On("GetEventsByBlockRange", mock.Anything, uint64(9), uint64(19)). - Return(EVMBlocks{}, false) - - // iteration 6: last finalized block is now 20, no events, report empty block - d.On("GetLastFinalizedBlock", mock.Anything).Return(&types.Header{Number: big.NewInt(20)}, nil).Once() - d.On("GetEventsByBlockRange", mock.Anything, uint64(9), uint64(19)). - Return(EVMBlocks{}, false) - - d.On("WaitForNewBlocks", mock.Anything, uint64(19)). - After(time.Millisecond * 100). - Return(uint64(20)).Once() - b19 := createEVMBlockFn(&types.Header{Number: big.NewInt(19)}, true) - expectedBlocks = append(expectedBlocks, b19) - d.On("GetBlockHeader", mock.Anything, uint64(19)).Return(b19.EVMBlockHeader, false) // reporting empty finalized to block - - // iteration 8: last finalized block is 21, no events - b20 := createEVMBlockFn(&types.Header{Number: big.NewInt(20)}, true) - expectedBlocks = append(expectedBlocks, b20) - d.On("GetLastFinalizedBlock", mock.Anything).Return(&types.Header{Number: big.NewInt(21)}, nil).Once() - d.On("GetEventsByBlockRange", mock.Anything, uint64(20), uint64(20)). - Return(EVMBlocks{}, false) - d.On("GetBlockHeader", mock.Anything, uint64(20)).Return(b20.EVMBlockHeader, false) // reporting empty finalized to block - - // iteration 9: last finalized block is 22, no events - d.On("WaitForNewBlocks", mock.Anything, uint64(20)). - After(time.Millisecond * 100). - Return(uint64(21)).Once() - b21 := createEVMBlockFn(&types.Header{Number: big.NewInt(21)}, true) - expectedBlocks = append(expectedBlocks, b21) - d.On("GetLastFinalizedBlock", mock.Anything).Return(&types.Header{Number: big.NewInt(22)}, nil).Once() - d.On("GetEventsByBlockRange", mock.Anything, uint64(21), uint64(21)). - Return(EVMBlocks{}, false) - d.On("GetBlockHeader", mock.Anything, uint64(21)).Return(b21.EVMBlockHeader, false) // reporting empty finalized to block - - // iteration 10: last finalized block is 23, no events - d.On("WaitForNewBlocks", mock.Anything, uint64(21)). - After(time.Millisecond * 100). - Return(uint64(22)).Once() - b22 := createEVMBlockFn(&types.Header{Number: big.NewInt(22)}, true) - expectedBlocks = append(expectedBlocks, b22) - d.On("GetLastFinalizedBlock", mock.Anything).Return(&types.Header{Number: big.NewInt(23)}, nil).Once() - d.On("GetEventsByBlockRange", mock.Anything, uint64(22), uint64(22)). - Return(EVMBlocks{}, false) - d.On("GetBlockHeader", mock.Anything, uint64(22)).Return(b22.EVMBlockHeader, false) // reporting empty finalized to block - - // iteration 11: last finalized block is still 23, no events - d.On("WaitForNewBlocks", mock.Anything, uint64(22)). - After(time.Millisecond * 100). - Return(uint64(23)).Once() - b23 := createEVMBlockFn(&types.Header{Number: big.NewInt(23)}, true) - expectedBlocks = append(expectedBlocks, b23) - d.On("GetLastFinalizedBlock", mock.Anything).Return(&types.Header{Number: big.NewInt(23)}, nil).Once() - d.On("GetEventsByBlockRange", mock.Anything, uint64(23), uint64(23)). - Return(EVMBlocks{}, false) - d.On("GetBlockHeader", mock.Anything, uint64(23)).Return(b23.EVMBlockHeader, false) // reporting empty finalized to block - - // iteration 12: finalized block is 24, has events - d.On("WaitForNewBlocks", mock.Anything, uint64(23)). - After(time.Millisecond * 100). - Return(uint64(24)).Once() - b24 := &EVMBlock{ - EVMBlockHeader: EVMBlockHeader{ - Num: 24, - Hash: common.HexToHash("24"), - }, - Events: []interface{}{testEvent(common.HexToHash("24"))}, - } - expectedBlocks = append(expectedBlocks, b24) - d.On("GetLastFinalizedBlock", mock.Anything).Return(&types.Header{Number: big.NewInt(24)}, nil).Once() - d.On("GetEventsByBlockRange", mock.Anything, uint64(24), uint64(24)). - Return(EVMBlocks{b24}, false) - - // iteration 13: closing the downloader - d.On("WaitForNewBlocks", mock.Anything, uint64(24)).Return(uint64(25)).After(time.Millisecond * 100).Once() - - go dwnldr.Download(ctx1, 0, downloadCh) - for _, expectedBlock := range expectedBlocks { - actualBlock := <-downloadCh - log.Debugf("block %d received!", actualBlock.Num) - require.Equal(t, *expectedBlock, actualBlock) - } - log.Debug("canceling") - cancel() - _, ok := <-downloadCh - require.False(t, ok) -} - func TestWaitForNewBlocks(t *testing.T) { ctx := context.Background() d, clientMock := NewTestDownloader(t, time.Millisecond*100) @@ -487,6 +311,168 @@ func TestGetLogs(t *testing.T) { require.Equal(t, []types.Log{}, logs) } +func TestDownloadEmptyReachingTopOfChain(t *testing.T) { + sut, clientMock := NewTestDownloader(t, time.Millisecond*100) + ctx := context.TODO() + downloadedCh := make(chan EVMBlock, 100) + sut.setStopDownloaderOnIterationN(7) + //blockNumBig := big.NewInt(5) + clientMock.EXPECT().HeaderByNumber(ctx, big.NewInt(int64(etherman.Latest))).Return(&types.Header{Number: big.NewInt(25)}, nil).Once() + clientMock.EXPECT().HeaderByNumber(ctx, big.NewInt(int64(etherman.Finalized))).Return(&types.Header{Number: big.NewInt(10)}, nil).Times(4) + // 1st range is 0 - 10 (chunkSize=10) + query := ethereum.FilterQuery{ + FromBlock: new(big.Int).SetUint64(0), + Addresses: []common.Address{contractAddr}, + ToBlock: new(big.Int).SetUint64(10), + } + clientMock.EXPECT().FilterLogs(ctx, query).Return(nil, nil).Once() + // the range it's empty so it ask for empty block 10 + clientMock.EXPECT().HeaderByNumber(ctx, big.NewInt(int64(10))).Return(&types.Header{Number: big.NewInt(10)}, nil) + // 2nd range is 10 - 21 (chunkSize=10) + query = ethereum.FilterQuery{ + FromBlock: new(big.Int).SetUint64(11), + Addresses: []common.Address{contractAddr}, + ToBlock: new(big.Int).SetUint64(21), + } + clientMock.EXPECT().FilterLogs(ctx, query).Return(nil, nil).Once() + + // 3rd range is 11 - 31 (chunkSize=10) extend range, but latest block is 25 -> so request is [11-25] + query = ethereum.FilterQuery{ + FromBlock: new(big.Int).SetUint64(11), + Addresses: []common.Address{contractAddr}, + ToBlock: new(big.Int).SetUint64(25), + } + clientMock.EXPECT().FilterLogs(ctx, query).Return(nil, nil).Once() + + // Now needs more block because current range is 10-31 and latest block is 25 + clientMock.EXPECT().HeaderByNumber(ctx, big.NewInt(int64(etherman.Latest))).Return(&types.Header{Number: big.NewInt(35)}, nil).Once() + // 4rd range is 11 - 41 (chunkSize=10) extend range, but latest block is 35 -> so request is [110-35] + query = ethereum.FilterQuery{ + FromBlock: new(big.Int).SetUint64(11), + Addresses: []common.Address{contractAddr}, + ToBlock: new(big.Int).SetUint64(35), + } + clientMock.EXPECT().FilterLogs(ctx, query).Return(nil, nil).Once() + + // Now needs more block because current range is 10-41 and latest block is 35 + clientMock.EXPECT().HeaderByNumber(ctx, big.NewInt(int64(etherman.Latest))).Return(&types.Header{Number: big.NewInt(36)}, nil).Once() + // We also change the finalized to 15, to cut the range to empty 15, range [16-41] + clientMock.EXPECT().HeaderByNumber(ctx, big.NewInt(int64(etherman.Finalized))).Return(&types.Header{Number: big.NewInt(17)}, nil).Times(3) + // 5rd range is 11 - 36 (chunkSize=10) extend range, but latest block is 36 -> so request is [11-36] + query = ethereum.FilterQuery{ + FromBlock: new(big.Int).SetUint64(11), + Addresses: []common.Address{contractAddr}, + ToBlock: new(big.Int).SetUint64(36), + } + clientMock.EXPECT().FilterLogs(ctx, query).Return(nil, nil).Once() + // Now must the finalize block is 17, so it must create a emtpy block 17 and cut to next range [18-28] + clientMock.EXPECT().HeaderByNumber(ctx, big.NewInt(int64(17))).Return(&types.Header{Number: big.NewInt(15)}, nil).Once() + // 6st range is 18 - 28 (chunkSize=10) + query = ethereum.FilterQuery{ + FromBlock: new(big.Int).SetUint64(18), + Addresses: []common.Address{contractAddr}, + ToBlock: new(big.Int).SetUint64(28), + } + clientMock.EXPECT().FilterLogs(ctx, query).Return(nil, nil).Once() + // 7st range is 18 - 36 (chunkSize=10) + query = ethereum.FilterQuery{ + FromBlock: new(big.Int).SetUint64(18), + Addresses: []common.Address{contractAddr}, + ToBlock: new(big.Int).SetUint64(36), + } + clientMock.EXPECT().FilterLogs(ctx, query).Return(nil, nil).Once() + + sut.Download(ctx, 0, downloadedCh) + + expectedBlocks := []struct { + num uint64 + finalized bool + }{ + {num: 10, finalized: true}, + {num: 15, finalized: true}, + } + + for _, expectedBlock := range expectedBlocks { + actualBlock := <-downloadedCh + log.Debugf("block %d received!", actualBlock.Num) + require.Equal(t, expectedBlock.num, actualBlock.Num) + } +} + +func TestDownloadBeforeFinalized(t *testing.T) { + mockEthDownloader := NewEVMDownloaderMock(t) + + ctx := context.Background() + ctx1, cancel := context.WithCancel(ctx) + defer cancel() + + downloader, _ := NewTestDownloader(t, time.Millisecond) + downloader.EVMDownloaderInterface = mockEthDownloader + + steps := []struct { + finalizedBlock uint64 + fromBlock, toBlock uint64 + eventsReponse EVMBlocks + waitForNewBlocks bool + waitForNewBlocksRequest uint64 + waitForNewBlockReply uint64 + getBlockHeader *EVMBlockHeader + }{ + {finalizedBlock: 33, fromBlock: 1, toBlock: 11, waitForNewBlocks: true, waitForNewBlocksRequest: 0, waitForNewBlockReply: 35, getBlockHeader: &EVMBlockHeader{Num: 11}}, + {finalizedBlock: 33, fromBlock: 12, toBlock: 22, eventsReponse: EVMBlocks{createEVMBlock(t, 14, true)}, getBlockHeader: &EVMBlockHeader{Num: 22}}, + // It returns the last block of range, so it don't need to create a empty one + {finalizedBlock: 33, fromBlock: 23, toBlock: 33, eventsReponse: EVMBlocks{createEVMBlock(t, 33, true)}}, + // It reach the top of chain (block 35) + {finalizedBlock: 33, fromBlock: 34, toBlock: 35}, + // Previous iteration we reach top of chain so we need update the latest block + {finalizedBlock: 33, fromBlock: 34, toBlock: 54, waitForNewBlocks: true, waitForNewBlocksRequest: 35, waitForNewBlockReply: 60}, + // finalized block is 35, so we can reduce emit an emptyBlock and reduce the range + {finalizedBlock: 35, fromBlock: 34, toBlock: 60, getBlockHeader: &EVMBlockHeader{Num: 35}}, + {finalizedBlock: 35, fromBlock: 36, toBlock: 46}, + {finalizedBlock: 35, fromBlock: 36, toBlock: 56, eventsReponse: EVMBlocks{createEVMBlock(t, 36, false)}}, + // Block 36 is the new last block,so it reduce the range again to [37-47] + {finalizedBlock: 35, fromBlock: 37, toBlock: 47}, + {finalizedBlock: 57, fromBlock: 37, toBlock: 57, eventsReponse: EVMBlocks{createEVMBlock(t, 57, false)}}, + } + for i := 0; i < len(steps); i++ { + log.Info("iteration: ", i, "------------------------------------------------") + downloadCh := make(chan EVMBlock, 100) + downloader, _ := NewTestDownloader(t, time.Millisecond) + downloader.EVMDownloaderInterface = mockEthDownloader + downloader.setStopDownloaderOnIterationN(i + 1) + expectedBlocks := EVMBlocks{} + for _, step := range steps[:i+1] { + mockEthDownloader.On("GetLastFinalizedBlock", mock.Anything).Return(&types.Header{Number: big.NewInt(int64(step.finalizedBlock))}, nil).Once() + if step.waitForNewBlocks { + mockEthDownloader.On("WaitForNewBlocks", mock.Anything, uint64(step.waitForNewBlocksRequest)).Return(uint64(step.waitForNewBlockReply)).Once() + } + mockEthDownloader.On("GetEventsByBlockRange", mock.Anything, uint64(step.fromBlock), uint64(step.toBlock)). + Return(step.eventsReponse, false).Once() + for _, eventBlock := range step.eventsReponse { + expectedBlocks = append(expectedBlocks, eventBlock) + } + if step.getBlockHeader != nil { + log.Infof("iteration:%d : GetBlockHeader(%d) ", i, step.getBlockHeader.Num) + mockEthDownloader.On("GetBlockHeader", mock.Anything, step.getBlockHeader.Num).Return(*step.getBlockHeader, false).Once() + expectedBlocks = append(expectedBlocks, &EVMBlock{ + EVMBlockHeader: *step.getBlockHeader, + IsFinalizedBlock: step.getBlockHeader.Num <= step.finalizedBlock, + }) + } + + } + downloader.Download(ctx1, 1, downloadCh) + mockEthDownloader.AssertExpectations(t) + for _, expectedBlock := range expectedBlocks { + log.Debugf("waiting block %d ", expectedBlock.Num) + actualBlock := <-downloadCh + log.Debugf("block %d received!", actualBlock.Num) + require.Equal(t, *expectedBlock, actualBlock) + } + } + +} + func buildAppender() LogAppenderMap { appender := make(LogAppenderMap) appender[eventSignature] = func(b *EVMBlock, l types.Log) error { @@ -512,3 +498,16 @@ func NewTestDownloader(t *testing.T, retryPeriod time.Duration) (*EVMDownloader, require.NoError(t, err) return d, clientMock } + +func createEVMBlock(t *testing.T, num uint64, isSafeBlock bool) *EVMBlock { + t.Helper() + return &EVMBlock{ + IsFinalizedBlock: isSafeBlock, + EVMBlockHeader: EVMBlockHeader{ + Num: num, + Hash: common.HexToHash(fmt.Sprintf("0x%.2X", num)), + ParentHash: common.HexToHash(fmt.Sprintf("0x%.2X", num-1)), + Timestamp: uint64(time.Now().Unix()), + }, + } +} From e023097e818a5dcea6a53316bef1c4250bb5e73e Mon Sep 17 00:00:00 2001 From: joanestebanr <129153821+joanestebanr@users.noreply.github.com> Date: Wed, 29 Jan 2025 11:54:18 +0100 Subject: [PATCH 2/4] fix: unittest and lint --- sync/evmdownloader.go | 5 +- sync/evmdownloader_test.go | 96 ++------------------------------------ 2 files changed, 5 insertions(+), 96 deletions(-) diff --git a/sync/evmdownloader.go b/sync/evmdownloader.go index 9d5aeb3f..4775b5e6 100644 --- a/sync/evmdownloader.go +++ b/sync/evmdownloader.go @@ -161,14 +161,12 @@ func (d *EVMDownloader) Download(ctx context.Context, fromBlock uint64, download toBlock = fromBlock + d.syncBlockChunkSize } else { if blocks.Len() == 0 { - if lastFinalizedBlockNumber > fromBlock && - toBlock-fromBlock > d.syncBlockChunkSize { + if lastFinalizedBlockNumber >= fromBlock { emptyBlock := lastFinalizedBlockNumber d.reportEmptyBlock(ctx, downloadedCh, emptyBlock, lastFinalizedBlockNumber) fromBlock = emptyBlock + 1 toBlock = fromBlock + d.syncBlockChunkSize } else { - // Extend range until find logs or reach the last finalized block toBlock += d.syncBlockChunkSize } @@ -183,7 +181,6 @@ func (d *EVMDownloader) Download(ctx context.Context, fromBlock uint64, download d.log.Infof("stop downloader on iteration %d", iteration) return } - } } diff --git a/sync/evmdownloader_test.go b/sync/evmdownloader_test.go index 1b47a8bb..57f4c284 100644 --- a/sync/evmdownloader_test.go +++ b/sync/evmdownloader_test.go @@ -311,94 +311,6 @@ func TestGetLogs(t *testing.T) { require.Equal(t, []types.Log{}, logs) } -func TestDownloadEmptyReachingTopOfChain(t *testing.T) { - sut, clientMock := NewTestDownloader(t, time.Millisecond*100) - ctx := context.TODO() - downloadedCh := make(chan EVMBlock, 100) - sut.setStopDownloaderOnIterationN(7) - //blockNumBig := big.NewInt(5) - clientMock.EXPECT().HeaderByNumber(ctx, big.NewInt(int64(etherman.Latest))).Return(&types.Header{Number: big.NewInt(25)}, nil).Once() - clientMock.EXPECT().HeaderByNumber(ctx, big.NewInt(int64(etherman.Finalized))).Return(&types.Header{Number: big.NewInt(10)}, nil).Times(4) - // 1st range is 0 - 10 (chunkSize=10) - query := ethereum.FilterQuery{ - FromBlock: new(big.Int).SetUint64(0), - Addresses: []common.Address{contractAddr}, - ToBlock: new(big.Int).SetUint64(10), - } - clientMock.EXPECT().FilterLogs(ctx, query).Return(nil, nil).Once() - // the range it's empty so it ask for empty block 10 - clientMock.EXPECT().HeaderByNumber(ctx, big.NewInt(int64(10))).Return(&types.Header{Number: big.NewInt(10)}, nil) - // 2nd range is 10 - 21 (chunkSize=10) - query = ethereum.FilterQuery{ - FromBlock: new(big.Int).SetUint64(11), - Addresses: []common.Address{contractAddr}, - ToBlock: new(big.Int).SetUint64(21), - } - clientMock.EXPECT().FilterLogs(ctx, query).Return(nil, nil).Once() - - // 3rd range is 11 - 31 (chunkSize=10) extend range, but latest block is 25 -> so request is [11-25] - query = ethereum.FilterQuery{ - FromBlock: new(big.Int).SetUint64(11), - Addresses: []common.Address{contractAddr}, - ToBlock: new(big.Int).SetUint64(25), - } - clientMock.EXPECT().FilterLogs(ctx, query).Return(nil, nil).Once() - - // Now needs more block because current range is 10-31 and latest block is 25 - clientMock.EXPECT().HeaderByNumber(ctx, big.NewInt(int64(etherman.Latest))).Return(&types.Header{Number: big.NewInt(35)}, nil).Once() - // 4rd range is 11 - 41 (chunkSize=10) extend range, but latest block is 35 -> so request is [110-35] - query = ethereum.FilterQuery{ - FromBlock: new(big.Int).SetUint64(11), - Addresses: []common.Address{contractAddr}, - ToBlock: new(big.Int).SetUint64(35), - } - clientMock.EXPECT().FilterLogs(ctx, query).Return(nil, nil).Once() - - // Now needs more block because current range is 10-41 and latest block is 35 - clientMock.EXPECT().HeaderByNumber(ctx, big.NewInt(int64(etherman.Latest))).Return(&types.Header{Number: big.NewInt(36)}, nil).Once() - // We also change the finalized to 15, to cut the range to empty 15, range [16-41] - clientMock.EXPECT().HeaderByNumber(ctx, big.NewInt(int64(etherman.Finalized))).Return(&types.Header{Number: big.NewInt(17)}, nil).Times(3) - // 5rd range is 11 - 36 (chunkSize=10) extend range, but latest block is 36 -> so request is [11-36] - query = ethereum.FilterQuery{ - FromBlock: new(big.Int).SetUint64(11), - Addresses: []common.Address{contractAddr}, - ToBlock: new(big.Int).SetUint64(36), - } - clientMock.EXPECT().FilterLogs(ctx, query).Return(nil, nil).Once() - // Now must the finalize block is 17, so it must create a emtpy block 17 and cut to next range [18-28] - clientMock.EXPECT().HeaderByNumber(ctx, big.NewInt(int64(17))).Return(&types.Header{Number: big.NewInt(15)}, nil).Once() - // 6st range is 18 - 28 (chunkSize=10) - query = ethereum.FilterQuery{ - FromBlock: new(big.Int).SetUint64(18), - Addresses: []common.Address{contractAddr}, - ToBlock: new(big.Int).SetUint64(28), - } - clientMock.EXPECT().FilterLogs(ctx, query).Return(nil, nil).Once() - // 7st range is 18 - 36 (chunkSize=10) - query = ethereum.FilterQuery{ - FromBlock: new(big.Int).SetUint64(18), - Addresses: []common.Address{contractAddr}, - ToBlock: new(big.Int).SetUint64(36), - } - clientMock.EXPECT().FilterLogs(ctx, query).Return(nil, nil).Once() - - sut.Download(ctx, 0, downloadedCh) - - expectedBlocks := []struct { - num uint64 - finalized bool - }{ - {num: 10, finalized: true}, - {num: 15, finalized: true}, - } - - for _, expectedBlock := range expectedBlocks { - actualBlock := <-downloadedCh - log.Debugf("block %d received!", actualBlock.Num) - require.Equal(t, expectedBlock.num, actualBlock.Num) - } -} - func TestDownloadBeforeFinalized(t *testing.T) { mockEthDownloader := NewEVMDownloaderMock(t) @@ -433,6 +345,8 @@ func TestDownloadBeforeFinalized(t *testing.T) { // Block 36 is the new last block,so it reduce the range again to [37-47] {finalizedBlock: 35, fromBlock: 37, toBlock: 47}, {finalizedBlock: 57, fromBlock: 37, toBlock: 57, eventsReponse: EVMBlocks{createEVMBlock(t, 57, false)}}, + {finalizedBlock: 61, fromBlock: 58, toBlock: 60, eventsReponse: EVMBlocks{createEVMBlock(t, 60, false)}}, + {finalizedBlock: 61, fromBlock: 61, toBlock: 61, waitForNewBlocks: true, waitForNewBlocksRequest: 60, waitForNewBlockReply: 61, getBlockHeader: &EVMBlockHeader{Num: 61}}, } for i := 0; i < len(steps); i++ { log.Info("iteration: ", i, "------------------------------------------------") @@ -444,9 +358,9 @@ func TestDownloadBeforeFinalized(t *testing.T) { for _, step := range steps[:i+1] { mockEthDownloader.On("GetLastFinalizedBlock", mock.Anything).Return(&types.Header{Number: big.NewInt(int64(step.finalizedBlock))}, nil).Once() if step.waitForNewBlocks { - mockEthDownloader.On("WaitForNewBlocks", mock.Anything, uint64(step.waitForNewBlocksRequest)).Return(uint64(step.waitForNewBlockReply)).Once() + mockEthDownloader.On("WaitForNewBlocks", mock.Anything, step.waitForNewBlocksRequest).Return(step.waitForNewBlockReply).Once() } - mockEthDownloader.On("GetEventsByBlockRange", mock.Anything, uint64(step.fromBlock), uint64(step.toBlock)). + mockEthDownloader.On("GetEventsByBlockRange", mock.Anything, step.fromBlock, step.toBlock). Return(step.eventsReponse, false).Once() for _, eventBlock := range step.eventsReponse { expectedBlocks = append(expectedBlocks, eventBlock) @@ -459,7 +373,6 @@ func TestDownloadBeforeFinalized(t *testing.T) { IsFinalizedBlock: step.getBlockHeader.Num <= step.finalizedBlock, }) } - } downloader.Download(ctx1, 1, downloadCh) mockEthDownloader.AssertExpectations(t) @@ -470,7 +383,6 @@ func TestDownloadBeforeFinalized(t *testing.T) { require.Equal(t, *expectedBlock, actualBlock) } } - } func buildAppender() LogAppenderMap { From 4f17fab8595868a7b44bf961ab100f726c176035 Mon Sep 17 00:00:00 2001 From: joanestebanr <129153821+joanestebanr@users.noreply.github.com> Date: Wed, 29 Jan 2025 12:11:34 +0100 Subject: [PATCH 3/4] fix: unittest and lint --- sync/evmdownloader.go | 2 +- sync/evmdownloader_test.go | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/sync/evmdownloader.go b/sync/evmdownloader.go index 4775b5e6..ab8b952f 100644 --- a/sync/evmdownloader.go +++ b/sync/evmdownloader.go @@ -122,7 +122,7 @@ func (d *EVMDownloader) Download(ctx context.Context, fromBlock uint64, download } d.log.Debugf("range: %d to %d, last block: %d", fromBlock, toBlock, lastBlock) - if reachTop && toBlock >= lastBlock { + if fromBlock > lastBlock || (reachTop && toBlock >= lastBlock) { d.log.Debugf( "waiting for new blocks, current range: [%d to %d], last block seen: %d", fromBlock, toBlock, lastBlock, diff --git a/sync/evmdownloader_test.go b/sync/evmdownloader_test.go index 57f4c284..fed8a8e1 100644 --- a/sync/evmdownloader_test.go +++ b/sync/evmdownloader_test.go @@ -347,6 +347,7 @@ func TestDownloadBeforeFinalized(t *testing.T) { {finalizedBlock: 57, fromBlock: 37, toBlock: 57, eventsReponse: EVMBlocks{createEVMBlock(t, 57, false)}}, {finalizedBlock: 61, fromBlock: 58, toBlock: 60, eventsReponse: EVMBlocks{createEVMBlock(t, 60, false)}}, {finalizedBlock: 61, fromBlock: 61, toBlock: 61, waitForNewBlocks: true, waitForNewBlocksRequest: 60, waitForNewBlockReply: 61, getBlockHeader: &EVMBlockHeader{Num: 61}}, + {finalizedBlock: 61, fromBlock: 62, toBlock: 62, waitForNewBlocks: true, waitForNewBlocksRequest: 61, waitForNewBlockReply: 62}, } for i := 0; i < len(steps); i++ { log.Info("iteration: ", i, "------------------------------------------------") From 4c20b3f3517e28fccebe2bb5ff2a73a9e959e7ad Mon Sep 17 00:00:00 2001 From: joanestebanr <129153821+joanestebanr@users.noreply.github.com> Date: Wed, 29 Jan 2025 12:37:18 +0100 Subject: [PATCH 4/4] fix: unittest and lint --- sync/evmdownloader.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sync/evmdownloader.go b/sync/evmdownloader.go index ab8b952f..f293b3b7 100644 --- a/sync/evmdownloader.go +++ b/sync/evmdownloader.go @@ -140,7 +140,8 @@ func (d *EVMDownloader) Download(ctx context.Context, fromBlock uint64, download d.log.Error("error getting last finalized block: ", err) continue } - lastFinalizedBlockNumber := lastFinalizedBlock.Number.Uint64() + // lastFinalizedBlock can't be > lastBlock + lastFinalizedBlockNumber := min(lastBlock, lastFinalizedBlock.Number.Uint64()) requestToBlock := toBlock if toBlock >= lastBlock {