From 81a18f0b06a4a892870d8d3a42fbe0261bb9ec18 Mon Sep 17 00:00:00 2001 From: Goran Rojovic Date: Mon, 20 Jan 2025 13:06:55 +0100 Subject: [PATCH] fix: downloader --- sync/evmdownloader.go | 88 +++++++++++++++++++----- sync/evmdownloader_test.go | 116 +++++++++++++++----------------- sync/evmdriver.go | 5 -- sync/evmtypes.go | 16 +++++ sync/mock_downloader_test.go | 38 +++++++++-- sync/mock_l2_test.go | 2 +- sync/mock_processor_test.go | 2 +- sync/mock_reorgdetector_test.go | 2 +- 8 files changed, 177 insertions(+), 92 deletions(-) diff --git a/sync/evmdownloader.go b/sync/evmdownloader.go index e22eabd6..bad890dd 100644 --- a/sync/evmdownloader.go +++ b/sync/evmdownloader.go @@ -12,6 +12,7 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/rpc" ) type EthClienter interface { @@ -23,9 +24,10 @@ type EthClienter interface { type EVMDownloaderInterface interface { WaitForNewBlocks(ctx context.Context, lastBlockSeen uint64) (newLastBlock uint64) - GetEventsByBlockRange(ctx context.Context, fromBlock, toBlock uint64) []EVMBlock + GetEventsByBlockRange(ctx context.Context, fromBlock, toBlock uint64) EVMBlocks GetLogs(ctx context.Context, fromBlock, toBlock uint64) []types.Log GetBlockHeader(ctx context.Context, blockNum uint64) (EVMBlockHeader, bool) + GetLastFinalizedBlock(ctx context.Context) (*types.Header, error) } type LogAppenderMap map[common.Hash]func(b *EVMBlock, l types.Log) error @@ -73,6 +75,9 @@ func NewEVMDownloader( func (d *EVMDownloader) Download(ctx context.Context, fromBlock uint64, downloadedCh chan EVMBlock) { lastBlock := d.WaitForNewBlocks(ctx, 0) + toBlock := lastBlock + automaticToBlockResize := true + for { select { case <-ctx.Done(): @@ -81,10 +86,14 @@ func (d *EVMDownloader) Download(ctx context.Context, fromBlock uint64, download return default: } - toBlock := fromBlock + d.syncBlockChunkSize - if toBlock > lastBlock { - toBlock = lastBlock + + if automaticToBlockResize { + toBlock = fromBlock + d.syncBlockChunkSize + if toBlock > lastBlock { + toBlock = lastBlock + } } + if fromBlock > toBlock { d.log.Debugf( "waiting for new blocks, last block processed %d, last block seen on L1 %d", @@ -93,25 +102,66 @@ func (d *EVMDownloader) Download(ctx context.Context, fromBlock uint64, download lastBlock = d.WaitForNewBlocks(ctx, fromBlock-1) continue } + + lastFinalizedBlock, err := d.GetLastFinalizedBlock(ctx) + if err != nil { + d.log.Error("error getting last finalized block: ", err) + continue + } + + lastFinalizedBlockNumber := lastFinalizedBlock.Number.Uint64() + d.log.Debugf("getting events from blocks %d to %d", fromBlock, toBlock) blocks := d.GetEventsByBlockRange(ctx, fromBlock, toBlock) - for _, b := range blocks { - d.log.Debugf("sending block %d to the driver (with events)", b.Num) - downloadedCh <- b + + reportBlocksFn := func(numOfBlocksToReport int) { + for i := 0; i < numOfBlocksToReport; i++ { + d.log.Debugf("sending block %d to the driver (with events)", blocks[i].Num) + downloadedCh <- blocks[i] + } } - if len(blocks) == 0 || blocks[len(blocks)-1].Num < toBlock { - // Indicate the last downloaded block if there are not events on it - d.log.Debugf("sending block %d to the driver (without events)", toBlock) - header, isCanceled := d.GetBlockHeader(ctx, toBlock) - if isCanceled { - return + + if blocks.Len() == 0 { + // if we don't have logs for given block range, keep increasing the to block until we hit a log + // or we hit the last finalized block number + // this saves us from the possible reorgs that can happen while we are processing wrong hash + toBlock++ + if lastFinalizedBlockNumber > toBlock { + // we might be slow to process the blocks, so we need to catch up + toBlock = lastFinalizedBlockNumber + 1 } - downloadedCh <- EVMBlock{ - EVMBlockHeader: header, + automaticToBlockResize = false + continue + } else if blocks[blocks.Len()-1].Num <= lastFinalizedBlockNumber { + // if the last block we have logs for is less than or equal to the last finalized block, + // report all of the blocks without the need to report the last empty block, since it is finalized + // and we do not need to track it in the reorg detector + reportBlocksFn(blocks.Len()) + fromBlock = lastFinalizedBlockNumber + 1 + automaticToBlockResize = true + } else if blocks[len(blocks)-1].Num < toBlock { + // if we have logs in some of the blocks, and they are not all finalized, + // check if we have finalized blocks in gotten range, report them and + // set the from block from the last finalized block and keep increasing the range + lastFinalizedBlock, index, exists := blocks.LastFinalizedBlock(lastFinalizedBlockNumber) + if exists { + reportBlocksFn(index + 1) // num of blocks to report is index + 1 since index is zero based + fromBlock = lastFinalizedBlock + 1 + automaticToBlockResize = true + continue } + + toBlock++ // increase the to block until we hit either finalized blocks or a block with logs + automaticToBlockResize = false + } else { + // if we have logs in the last block, just report all of them and continue + // reorg detector will handle the reorg since the last block has events, + // and we are not afraid to have missaligned hashes at this point + reportBlocksFn(blocks.Len()) + fromBlock = toBlock + 1 + automaticToBlockResize = true } - fromBlock = toBlock + 1 } } @@ -149,6 +199,10 @@ func NewEVMDownloaderImplementation( } } +func (d *EVMDownloaderImplementation) GetLastFinalizedBlock(ctx context.Context) (*types.Header, error) { + return d.ethClient.HeaderByNumber(ctx, big.NewInt(int64(rpc.SafeBlockNumber))) +} + func (d *EVMDownloaderImplementation) WaitForNewBlocks( ctx context.Context, lastBlockSeen uint64, ) (newLastBlock uint64) { @@ -175,7 +229,7 @@ func (d *EVMDownloaderImplementation) WaitForNewBlocks( } } -func (d *EVMDownloaderImplementation) GetEventsByBlockRange(ctx context.Context, fromBlock, toBlock uint64) []EVMBlock { +func (d *EVMDownloaderImplementation) GetEventsByBlockRange(ctx context.Context, fromBlock, toBlock uint64) EVMBlocks { select { case <-ctx.Done(): return nil diff --git a/sync/evmdownloader_test.go b/sync/evmdownloader_test.go index 04c92e72..66953f58 100644 --- a/sync/evmdownloader_test.go +++ b/sync/evmdownloader_test.go @@ -35,7 +35,7 @@ func TestGetEventsByBlockRange(t *testing.T) { description string inputLogs []types.Log fromBlock, toBlock uint64 - expectedBlocks []EVMBlock + expectedBlocks EVMBlocks } testCases := []testCase{} ctx := context.Background() @@ -47,7 +47,7 @@ func TestGetEventsByBlockRange(t *testing.T) { inputLogs: []types.Log{}, fromBlock: 1, toBlock: 3, - expectedBlocks: []EVMBlock{}, + expectedBlocks: EVMBlocks{}, } testCases = append(testCases, case0) @@ -56,7 +56,7 @@ func TestGetEventsByBlockRange(t *testing.T) { logsC1 := []types.Log{ *logC1, } - blocksC1 := []EVMBlock{ + blocksC1 := EVMBlocks{ { EVMBlockHeader: EVMBlockHeader{ Num: logC1.BlockNumber, @@ -121,7 +121,7 @@ func TestGetEventsByBlockRange(t *testing.T) { *logC3_3, *logC3_4, } - blocksC3 := []EVMBlock{ + blocksC3 := EVMBlocks{ { EVMBlockHeader: EVMBlockHeader{ Num: logC3_1.BlockNumber, @@ -206,32 +206,20 @@ func TestDownload(t *testing.T) { downloadCh := make(chan EVMBlock, 1) ctx := context.Background() ctx1, cancel := context.WithCancel(ctx) - expectedBlocks := []EVMBlock{} + expectedBlocks := EVMBlocks{} dwnldr, _ := NewTestDownloader(t) dwnldr.EVMDownloaderInterface = d d.On("WaitForNewBlocks", mock.Anything, uint64(0)). Return(uint64(1)) - // iteratiion 0: + + // iteration 0: // last block is 1, download that block (no events and wait) - b1 := EVMBlock{ - EVMBlockHeader: EVMBlockHeader{ - Num: 1, - Hash: common.HexToHash("01"), - }, - } - expectedBlocks = append(expectedBlocks, b1) + d.On("GetLastFinalizedBlock", mock.Anything).Return(&types.Header{Number: big.NewInt(1)}, nil).Once() d.On("GetEventsByBlockRange", mock.Anything, uint64(0), uint64(1)). - Return([]EVMBlock{}, false) - d.On("GetBlockHeader", mock.Anything, uint64(1)). - Return(b1.EVMBlockHeader, false) + Return(EVMBlocks{}, false) - // iteration 1: wait for next block to be created - d.On("WaitForNewBlocks", mock.Anything, uint64(1)). - After(time.Millisecond * 100). - Return(uint64(2)).Once() - - // iteration 2: block 2 has events + // iteration 1: block 2 has events b2 := EVMBlock{ EVMBlockHeader: EVMBlockHeader{ Num: 2, @@ -239,15 +227,16 @@ func TestDownload(t *testing.T) { }, } expectedBlocks = append(expectedBlocks, b2) - d.On("GetEventsByBlockRange", mock.Anything, uint64(2), uint64(2)). - Return([]EVMBlock{b2}, false) + d.On("GetLastFinalizedBlock", mock.Anything).Return(&types.Header{Number: big.NewInt(2)}, nil).Once() + d.On("GetEventsByBlockRange", mock.Anything, uint64(0), uint64(2)). + Return(EVMBlocks{b2}, false) - // iteration 3: wait for next block to be created (jump to block 8) + // iteration 2: wait for next block to be created (jump to block 8) d.On("WaitForNewBlocks", mock.Anything, uint64(2)). After(time.Millisecond * 100). Return(uint64(8)).Once() - // iteration 4: blocks 6 and 7 have events + // iteration 3: blocks 6 and 7 have events, but last finalized block is 5 b6 := EVMBlock{ EVMBlockHeader: EVMBlockHeader{ Num: 6, @@ -262,52 +251,53 @@ func TestDownload(t *testing.T) { }, Events: []interface{}{"07"}, } - b8 := EVMBlock{ - EVMBlockHeader: EVMBlockHeader{ - Num: 8, - Hash: common.HexToHash("08"), - }, - } - expectedBlocks = append(expectedBlocks, b6, b7, b8) + expectedBlocks = append(expectedBlocks, b6, b7) + d.On("GetLastFinalizedBlock", mock.Anything).Return(&types.Header{Number: big.NewInt(5)}, nil).Once() d.On("GetEventsByBlockRange", mock.Anything, uint64(3), uint64(8)). - Return([]EVMBlock{b6, b7}, false) - d.On("GetBlockHeader", mock.Anything, uint64(8)). - Return(b8.EVMBlockHeader, false) + Return(EVMBlocks{b6, b7}, false) - // iteration 5: wait for next block to be created (jump to block 30) - d.On("WaitForNewBlocks", mock.Anything, uint64(8)). - After(time.Millisecond * 100). - Return(uint64(30)).Once() + // iteration 5: finalized block is now block 8, we report events b6 and b7 + d.On("GetLastFinalizedBlock", mock.Anything).Return(&types.Header{Number: big.NewInt(8)}, nil).Once() + d.On("GetEventsByBlockRange", mock.Anything, uint64(3), uint64(9)). + Return(EVMBlocks{b6, b7}, false) // iteration 6: from block 9 to 19, no events - b19 := EVMBlock{ - EVMBlockHeader: EVMBlockHeader{ - Num: 19, - Hash: common.HexToHash("19"), - }, - } - expectedBlocks = append(expectedBlocks, b19) + d.On("WaitForNewBlocks", mock.Anything, uint64(8)). + After(time.Millisecond * 100). + Return(uint64(19)).Once() + d.On("GetLastFinalizedBlock", mock.Anything).Return(&types.Header{Number: big.NewInt(15)}, nil).Once() d.On("GetEventsByBlockRange", mock.Anything, uint64(9), uint64(19)). - Return([]EVMBlock{}, false) - d.On("GetBlockHeader", mock.Anything, uint64(19)). - Return(b19.EVMBlockHeader, false) - - // iteration 7: from block 20 to 30, events on last block - b30 := EVMBlock{ + Return(EVMBlocks{}, false) + + // iteration 7: last finalized block is now 20, no events + d.On("GetLastFinalizedBlock", mock.Anything).Return(&types.Header{Number: big.NewInt(20)}, nil).Once() + d.On("GetEventsByBlockRange", mock.Anything, uint64(9), uint64(20)). + Return(EVMBlocks{}, false) + + // iteration 8, 9, 10, : last finalized block is still 20, no events + d.On("GetLastFinalizedBlock", mock.Anything).Return(&types.Header{Number: big.NewInt(20)}, nil).Times(3) + d.On("GetEventsByBlockRange", mock.Anything, uint64(9), uint64(21)). + Return(EVMBlocks{}, false) + d.On("GetEventsByBlockRange", mock.Anything, uint64(9), uint64(22)). + Return(EVMBlocks{}, false) + d.On("GetEventsByBlockRange", mock.Anything, uint64(9), uint64(23)). + Return(EVMBlocks{}, false) + + // iteration 11: from block 20 to 24, events on last block + b24 := EVMBlock{ EVMBlockHeader: EVMBlockHeader{ - Num: 30, - Hash: common.HexToHash("30"), + Num: 24, + Hash: common.HexToHash("24"), }, - Events: []interface{}{testEvent(common.HexToHash("30"))}, + Events: []interface{}{testEvent(common.HexToHash("24"))}, } - expectedBlocks = append(expectedBlocks, b30) - d.On("GetEventsByBlockRange", mock.Anything, uint64(20), uint64(30)). - Return([]EVMBlock{b30}, false) + expectedBlocks = append(expectedBlocks, b24) + d.On("GetLastFinalizedBlock", mock.Anything).Return(&types.Header{Number: big.NewInt(20)}, nil).Once() + d.On("GetEventsByBlockRange", mock.Anything, uint64(9), uint64(24)). + Return(EVMBlocks{b24}, false) - // iteration 8: wait for next block to be created (jump to block 35) - d.On("WaitForNewBlocks", mock.Anything, uint64(30)). - After(time.Millisecond * 100). - Return(uint64(35)).Once() + // iteration 12: closing the downloader + d.On("WaitForNewBlocks", mock.Anything, uint64(24)).Return(uint64(25)).After(time.Millisecond * 100).Once() go dwnldr.Download(ctx1, 0, downloadCh) for _, expectedBlock := range expectedBlocks { diff --git a/sync/evmdriver.go b/sync/evmdriver.go index 43ee2310..89fdf7c6 100644 --- a/sync/evmdriver.go +++ b/sync/evmdriver.go @@ -9,11 +9,6 @@ import ( "github.com/ethereum/go-ethereum/common" ) -type evmDownloaderFull interface { - EVMDownloaderInterface - downloader -} - type downloader interface { Download(ctx context.Context, fromBlock uint64, downloadedCh chan EVMBlock) } diff --git a/sync/evmtypes.go b/sync/evmtypes.go index d242dbc4..e2f1eefb 100644 --- a/sync/evmtypes.go +++ b/sync/evmtypes.go @@ -2,6 +2,22 @@ package sync import "github.com/ethereum/go-ethereum/common" +type EVMBlocks []EVMBlock + +func (e EVMBlocks) Len() int { + return len(e) +} + +func (e EVMBlocks) LastFinalizedBlock(lastFinalizedBlockOnNetwork uint64) (uint64, int, bool) { + for i := len(e) - 1; i >= 0; i-- { + if e[i].Num <= lastFinalizedBlockOnNetwork { + return e[i].Num, i, true + } + } + + return 0, 0, false // no finalized block found +} + type EVMBlock struct { EVMBlockHeader Events []interface{} diff --git a/sync/mock_downloader_test.go b/sync/mock_downloader_test.go index f28045b5..758d4b4e 100644 --- a/sync/mock_downloader_test.go +++ b/sync/mock_downloader_test.go @@ -48,25 +48,55 @@ func (_m *EVMDownloaderMock) GetBlockHeader(ctx context.Context, blockNum uint64 } // GetEventsByBlockRange provides a mock function with given fields: ctx, fromBlock, toBlock -func (_m *EVMDownloaderMock) GetEventsByBlockRange(ctx context.Context, fromBlock uint64, toBlock uint64) []EVMBlock { +func (_m *EVMDownloaderMock) GetEventsByBlockRange(ctx context.Context, fromBlock uint64, toBlock uint64) EVMBlocks { ret := _m.Called(ctx, fromBlock, toBlock) if len(ret) == 0 { panic("no return value specified for GetEventsByBlockRange") } - var r0 []EVMBlock - if rf, ok := ret.Get(0).(func(context.Context, uint64, uint64) []EVMBlock); ok { + var r0 EVMBlocks + if rf, ok := ret.Get(0).(func(context.Context, uint64, uint64) EVMBlocks); ok { r0 = rf(ctx, fromBlock, toBlock) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).([]EVMBlock) + r0 = ret.Get(0).(EVMBlocks) } } return r0 } +// GetLastFinalizedBlock provides a mock function with given fields: ctx +func (_m *EVMDownloaderMock) GetLastFinalizedBlock(ctx context.Context) (*types.Header, error) { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for GetLastFinalizedBlock") + } + + var r0 *types.Header + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (*types.Header, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) *types.Header); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*types.Header) + } + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // GetLogs provides a mock function with given fields: ctx, fromBlock, toBlock func (_m *EVMDownloaderMock) GetLogs(ctx context.Context, fromBlock uint64, toBlock uint64) []types.Log { ret := _m.Called(ctx, fromBlock, toBlock) diff --git a/sync/mock_l2_test.go b/sync/mock_l2_test.go index 7a4bae36..b89b751e 100644 --- a/sync/mock_l2_test.go +++ b/sync/mock_l2_test.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.39.0. DO NOT EDIT. +// Code generated by mockery v2.45.0. DO NOT EDIT. package sync diff --git a/sync/mock_processor_test.go b/sync/mock_processor_test.go index afbb34cb..375ab123 100644 --- a/sync/mock_processor_test.go +++ b/sync/mock_processor_test.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.39.0. DO NOT EDIT. +// Code generated by mockery v2.45.0. DO NOT EDIT. package sync diff --git a/sync/mock_reorgdetector_test.go b/sync/mock_reorgdetector_test.go index 9689f7e7..0bf53fa3 100644 --- a/sync/mock_reorgdetector_test.go +++ b/sync/mock_reorgdetector_test.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.39.0. DO NOT EDIT. +// Code generated by mockery v2.45.0. DO NOT EDIT. package sync