From 55df14375b6f573fd3b62cb716d20a61e6244c70 Mon Sep 17 00:00:00 2001 From: Goran Rojovic Date: Mon, 20 Jan 2025 15:33:28 +0100 Subject: [PATCH] fix: driver handling closing download channel --- l1infotreesync/e2e_test.go | 12 +++++------- sync/evmdownloader.go | 21 ++++++++++----------- sync/evmdriver.go | 10 +++++++--- 3 files changed, 22 insertions(+), 21 deletions(-) diff --git a/l1infotreesync/e2e_test.go b/l1infotreesync/e2e_test.go index 2e22e138..cad28b70 100644 --- a/l1infotreesync/e2e_test.go +++ b/l1infotreesync/e2e_test.go @@ -66,7 +66,7 @@ func TestE2E(t *testing.T) { rdm.On("AddBlockToTrack", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) client, auth, gerAddr, verifyAddr, gerSc, verifySC := newSimulatedClient(t) - syncer, err := l1infotreesync.New(ctx, dbPath, gerAddr, verifyAddr, 10, etherman.LatestBlock, rdm, client.Client(), time.Millisecond, 0, 100*time.Millisecond, 3, + syncer, err := l1infotreesync.New(ctx, dbPath, gerAddr, verifyAddr, 10, etherman.LatestBlock, rdm, client.Client(), time.Millisecond, 0, 100*time.Millisecond, 25, l1infotreesync.FlagAllowWrongContractsAddrs) require.NoError(t, err) @@ -228,9 +228,6 @@ func TestWithReorgs(t *testing.T) { // Block 4, 5, 6 after the fork commitBlocks(t, client, 3, time.Millisecond*500) - // Make sure syncer is up to date - waitForSyncerToCatchUp(ctx, t, syncer, client) - // Assert rollup exit root after the fork - should be zero since there are no events in the block after the fork expectedRollupExitRoot, err = verifySC.GetRollupExitRoot(&bind.CallOpts{Pending: false}) require.NoError(t, err) @@ -244,11 +241,12 @@ func TestWithReorgs(t *testing.T) { require.NoError(t, err) time.Sleep(time.Millisecond * 500) + commitBlocks(t, client, 1, time.Millisecond*100) + // create some events and update the trees updateL1InfoTreeAndRollupExitTree(2, 1) - // Block 4, 5, 6, 7 after the fork - commitBlocks(t, client, 4, time.Millisecond*100) + commitBlocks(t, client, 1, time.Millisecond*100) // Make sure syncer is up to date waitForSyncerToCatchUp(ctx, t, syncer, client) @@ -323,7 +321,7 @@ func TestStressAndReorgs(t *testing.T) { } } - commitBlocks(t, client, 1, time.Millisecond*10) + commitBlocks(t, client, 11, time.Millisecond*10) waitForSyncerToCatchUp(ctx, t, syncer, client) diff --git a/sync/evmdownloader.go b/sync/evmdownloader.go index f23701d7..e625f341 100644 --- a/sync/evmdownloader.go +++ b/sync/evmdownloader.go @@ -76,7 +76,7 @@ func NewEVMDownloader( func (d *EVMDownloader) Download(ctx context.Context, fromBlock uint64, downloadedCh chan EVMBlock) { lastBlock := d.WaitForNewBlocks(ctx, 0) toBlock := fromBlock - automaticToBlockResize := true + //automaticToBlockResize := true for { select { @@ -87,14 +87,14 @@ func (d *EVMDownloader) Download(ctx context.Context, fromBlock uint64, download default: } - if automaticToBlockResize { - toBlock = fromBlock + d.syncBlockChunkSize - if toBlock > lastBlock { - toBlock = lastBlock - } + //if automaticToBlockResize { + toBlock = fromBlock + d.syncBlockChunkSize + if toBlock > lastBlock { + toBlock = lastBlock } + //} - automaticToBlockResize = true // reset the flag + //automaticToBlockResize = true // reset the flag if fromBlock > toBlock { d.log.Debugf( @@ -141,8 +141,6 @@ func (d *EVMDownloader) Download(ctx context.Context, fromBlock uint64, download if lastFinalizedBlockNumber > toBlock { // we might be behind a lot, so go until last finalized block toBlock = lastFinalizedBlockNumber + 1 - } else { - toBlock++ } if lastFinalizedBlockNumber-fromBlock >= d.syncBlockChunkSize { @@ -153,7 +151,7 @@ func (d *EVMDownloader) Download(ctx context.Context, fromBlock uint64, download fromBlock = lastFinalizedBlockNumber } - automaticToBlockResize = false + //automaticToBlockResize = false continue } else if blocks[blocks.Len()-1].Num <= lastFinalizedBlockNumber { @@ -162,10 +160,11 @@ func (d *EVMDownloader) Download(ctx context.Context, fromBlock uint64, download // and we do not need to track it in the reorg detector reportBlocksFn(blocks.Len()) fromBlock = lastFinalizedBlockNumber + 1 - } else if blocks[len(blocks)-1].Num < toBlock { + } else if blocks[blocks.Len()-1].Num < toBlock { // if we have logs in some of the blocks, and they are not all finalized, // check if we have finalized blocks in gotten range, report them and // set the from block from the last finalized block and keep increasing the range + // if not keep getting that range to protect us from possible mishandling of block hashes lastFinalizedBlock, index, exists := blocks.LastFinalizedBlock(lastFinalizedBlockNumber) if exists { reportBlocksFn(index + 1) // num of blocks to report is index + 1 since index is zero based diff --git a/sync/evmdriver.go b/sync/evmdriver.go index 89fdf7c6..72bd053b 100644 --- a/sync/evmdriver.go +++ b/sync/evmdriver.go @@ -81,7 +81,7 @@ reset: cancellableCtx, cancel := context.WithCancel(ctx) defer cancel() - log.Info("Starting sync...", " lastProcessedBlock", lastProcessedBlock) + log.Info("Starting sync...", " lastProcessedBlock ", lastProcessedBlock) // start downloading downloadCh := make(chan EVMBlock, d.downloadBufferSize) go d.downloader.Download(cancellableCtx, lastProcessedBlock+1, downloadCh) @@ -92,9 +92,13 @@ reset: d.log.Info("sync stopped due to context done") cancel() return - case b := <-downloadCh: + case b, ok := <-downloadCh: d.log.Debugf("handleNewBlock, blockNum: %d, blockHash: %s", b.Num, b.Hash) - d.handleNewBlock(ctx, cancel, b) + if ok { + // when channel is closing, it is sending an empty block with num = 0, and empty hash + // because it is not passing object by reference, but by value, so do not handle that since it is closing + d.handleNewBlock(ctx, cancel, b) + } case firstReorgedBlock := <-d.reorgSub.ReorgedBlock: d.log.Info("handleReorg from block: ", firstReorgedBlock) d.handleReorg(ctx, cancel, firstReorgedBlock)