diff --git a/bridgesync/bridgesync.go b/bridgesync/bridgesync.go index 4052ba29..4f6f1e90 100644 --- a/bridgesync/bridgesync.go +++ b/bridgesync/bridgesync.go @@ -12,8 +12,8 @@ import ( ) const ( - bridgeSyncL1 = "L1" - bridgeSyncL2 = "L2" + bridgeSyncL1 = "BridgeSyncL1" + bridgeSyncL2 = "BridgeSyncL2" downloadBufferSize = 1000 ) @@ -45,6 +45,7 @@ func NewL1( maxRetryAttemptsAfterError int, originNetwork uint32, syncFullClaims bool, + finalizedBlockType etherman.BlockNumberFinality, ) (*BridgeSync, error) { return newBridgeSync( ctx, @@ -61,6 +62,7 @@ func NewL1( maxRetryAttemptsAfterError, originNetwork, syncFullClaims, + finalizedBlockType, ) } @@ -79,6 +81,7 @@ func NewL2( maxRetryAttemptsAfterError int, originNetwork uint32, syncFullClaims bool, + finalizedBlockType etherman.BlockNumberFinality, ) (*BridgeSync, error) { return newBridgeSync( ctx, @@ -95,6 +98,7 @@ func NewL2( maxRetryAttemptsAfterError, originNetwork, syncFullClaims, + finalizedBlockType, ) } @@ -113,6 +117,7 @@ func newBridgeSync( maxRetryAttemptsAfterError int, originNetwork uint32, syncFullClaims bool, + finalizedBlockType etherman.BlockNumberFinality, ) (*BridgeSync, error) { logger := log.WithFields("bridge-syncer", layerID) processor, err := newProcessor(dbPath, logger) @@ -151,6 +156,7 @@ func newBridgeSync( appender, []common.Address{bridge}, rh, + finalizedBlockType, ) if err != nil { return nil, err diff --git a/bridgesync/bridgesync_test.go b/bridgesync/bridgesync_test.go index e2358d79..838967eb 100644 --- a/bridgesync/bridgesync_test.go +++ b/bridgesync/bridgesync_test.go @@ -56,6 +56,7 @@ func TestNewLx(t *testing.T) { maxRetryAttemptsAfterError, originNetwork, false, + blockFinalityType, ) assert.NoError(t, err) @@ -77,6 +78,7 @@ func TestNewLx(t *testing.T) { maxRetryAttemptsAfterError, originNetwork, false, + blockFinalityType, ) assert.NoError(t, err) diff --git a/bridgesync/e2e_test.go b/bridgesync/e2e_test.go index 4b1fc94a..06a705f2 100644 --- a/bridgesync/e2e_test.go +++ b/bridgesync/e2e_test.go @@ -88,6 +88,8 @@ func TestBridgeEventE2E(t *testing.T) { } } + helpers.CommitBlocks(t, setup.L1Environment.SimBackend, 11, blockTime) + // Wait for syncer to catch up time.Sleep(time.Second * 2) // sleeping since the processor could be up to date, but have pending reorgs lb, err := setup.L1Environment.SimBackend.Client().BlockNumber(ctx) diff --git a/cmd/run.go b/cmd/run.go index cca10b60..b53260d6 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -526,6 +526,7 @@ func runL1InfoTreeSyncerIfNeeded( cfg.L1InfoTreeSync.RetryAfterErrorPeriod.Duration, cfg.L1InfoTreeSync.MaxRetryAttemptsAfterError, l1infotreesync.FlagNone, + etherman.FinalizedBlock, ) if err != nil { log.Fatal(err) @@ -727,6 +728,7 @@ func runBridgeSyncL1IfNeeded( cfg.MaxRetryAttemptsAfterError, rollupID, false, + etherman.FinalizedBlock, ) if err != nil { log.Fatalf("error creating bridgeSyncL1: %s", err) @@ -762,6 +764,7 @@ func runBridgeSyncL2IfNeeded( cfg.MaxRetryAttemptsAfterError, rollupID, true, + etherman.LatestBlock, ) if err != nil { log.Fatalf("error creating bridgeSyncL2: %s", err) diff --git a/etherman/types.go b/etherman/types.go index 0f705899..64d31495 100644 --- a/etherman/types.go +++ b/etherman/types.go @@ -120,6 +120,10 @@ func (b *BlockNumberFinality) ToBlockNum() (*big.Int, error) { } } +func (b BlockNumberFinality) IsFinalized() bool { + return b == FinalizedBlock +} + type BlockNumber int64 const ( diff --git a/l1infotreesync/e2e_test.go b/l1infotreesync/e2e_test.go index ee4f464f..ff5c40e8 100644 --- a/l1infotreesync/e2e_test.go +++ b/l1infotreesync/e2e_test.go @@ -70,8 +70,8 @@ func TestE2E(t *testing.T) { rdm.On("AddBlockToTrack", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) client, auth, gerAddr, verifyAddr, gerSc, verifySC := newSimulatedClient(t) - syncer, err := l1infotreesync.New(ctx, dbPath, gerAddr, verifyAddr, 10, etherman.LatestBlock, rdm, client.Client(), time.Millisecond, 0, 100*time.Millisecond, 3, - l1infotreesync.FlagAllowWrongContractsAddrs) + syncer, err := l1infotreesync.New(ctx, dbPath, gerAddr, verifyAddr, 10, etherman.LatestBlock, rdm, client.Client(), time.Millisecond, 0, 100*time.Millisecond, 25, + l1infotreesync.FlagAllowWrongContractsAddrs, etherman.SafeBlock) require.NoError(t, err) go syncer.Start(ctx) @@ -159,7 +159,7 @@ func TestWithReorgs(t *testing.T) { require.NoError(t, rd.Start(ctx)) syncer, err := l1infotreesync.New(ctx, dbPathSyncer, gerAddr, verifyAddr, 10, etherman.LatestBlock, rd, client.Client(), time.Millisecond, 0, time.Second, 25, - l1infotreesync.FlagAllowWrongContractsAddrs) + l1infotreesync.FlagAllowWrongContractsAddrs, etherman.SafeBlock) require.NoError(t, err) go syncer.Start(ctx) @@ -221,9 +221,6 @@ func TestWithReorgs(t *testing.T) { // Block 4, 5, 6 after the fork helpers.CommitBlocks(t, client, 3, time.Millisecond*500) - // Make sure syncer is up to date - waitForSyncerToCatchUp(ctx, t, syncer, client) - // Assert rollup exit root after the fork - should be zero since there are no events in the block after the fork expectedRollupExitRoot, err = verifySC.GetRollupExitRoot(&bind.CallOpts{Pending: false}) require.NoError(t, err) @@ -236,11 +233,12 @@ func TestWithReorgs(t *testing.T) { require.NoError(t, err) time.Sleep(time.Millisecond * 500) + helpers.CommitBlocks(t, client, 1, time.Millisecond*100) + // create some events and update the trees updateL1InfoTreeAndRollupExitTree(2, 1) - // Block 4, 5, 6, 7 after the fork - helpers.CommitBlocks(t, client, 4, time.Millisecond*100) + helpers.CommitBlocks(t, client, 1, time.Millisecond*100) // Make sure syncer is up to date waitForSyncerToCatchUp(ctx, t, syncer, client) @@ -274,7 +272,7 @@ func TestStressAndReorgs(t *testing.T) { require.NoError(t, rd.Start(ctx)) syncer, err := l1infotreesync.New(ctx, dbPathSyncer, gerAddr, verifyAddr, 10, etherman.LatestBlock, rd, client.Client(), time.Millisecond, 0, time.Second, 100, - l1infotreesync.FlagAllowWrongContractsAddrs) + l1infotreesync.FlagAllowWrongContractsAddrs, etherman.SafeBlock) require.NoError(t, err) go syncer.Start(ctx) @@ -305,7 +303,7 @@ func TestStressAndReorgs(t *testing.T) { } } - helpers.CommitBlocks(t, client, 1, time.Millisecond*10) + helpers.CommitBlocks(t, client, 11, time.Millisecond*10) waitForSyncerToCatchUp(ctx, t, syncer, client) diff --git a/l1infotreesync/l1infotreesync.go b/l1infotreesync/l1infotreesync.go index e6262ffb..df685d52 100644 --- a/l1infotreesync/l1infotreesync.go +++ b/l1infotreesync/l1infotreesync.go @@ -47,6 +47,7 @@ func New( retryAfterErrorPeriod time.Duration, maxRetryAttemptsAfterError int, flags CreationFlags, + finalizedBlockType etherman.BlockNumberFinality, ) (*L1InfoTreeSync, error) { processor, err := newProcessor(dbPath) if err != nil { @@ -83,6 +84,7 @@ func New( appender, []common.Address{globalExitRoot, rollupManager}, rh, + finalizedBlockType, ) if err != nil { return nil, err diff --git a/l1infotreesync/processor.go b/l1infotreesync/processor.go index 9c91aeb6..75f73524 100644 --- a/l1infotreesync/processor.go +++ b/l1infotreesync/processor.go @@ -242,7 +242,6 @@ func (p *processor) Reorg(ctx context.Context, firstReorgedBlock uint64) error { shouldRollback := true defer func() { if shouldRollback { - log.Debugf("rolling back reorg, first reorged block: %d", firstReorgedBlock) if errRllbck := tx.Rollback(); errRllbck != nil { log.Errorf("error while rolling back tx %v", errRllbck) } @@ -269,6 +268,9 @@ func (p *processor) Reorg(ctx context.Context, firstReorgedBlock uint64) error { if err := tx.Commit(); err != nil { return err } + + shouldRollback = false + sync.UnhaltIfAffectedRows(&p.halted, &p.haltedReason, &p.mu, rowsAffected) return nil } diff --git a/sync/evmdownloader.go b/sync/evmdownloader.go index 8b3ad790..d4fbbdf6 100644 --- a/sync/evmdownloader.go +++ b/sync/evmdownloader.go @@ -28,9 +28,10 @@ type EthClienter interface { type EVMDownloaderInterface interface { WaitForNewBlocks(ctx context.Context, lastBlockSeen uint64) (newLastBlock uint64) - GetEventsByBlockRange(ctx context.Context, fromBlock, toBlock uint64) []EVMBlock + GetEventsByBlockRange(ctx context.Context, fromBlock, toBlock uint64) EVMBlocks GetLogs(ctx context.Context, fromBlock, toBlock uint64) []types.Log GetBlockHeader(ctx context.Context, blockNum uint64) (EVMBlockHeader, bool) + GetLastFinalizedBlock(ctx context.Context) (*types.Header, error) } type LogAppenderMap map[common.Hash]func(b *EVMBlock, l types.Log) error @@ -38,7 +39,8 @@ type LogAppenderMap map[common.Hash]func(b *EVMBlock, l types.Log) error type EVMDownloader struct { syncBlockChunkSize uint64 EVMDownloaderInterface - log *log.Logger + log *log.Logger + finalizedBlockType etherman.BlockNumberFinality } func NewEVMDownloader( @@ -50,19 +52,41 @@ func NewEVMDownloader( appender LogAppenderMap, adressessToQuery []common.Address, rh *RetryHandler, + finalizedBlockType etherman.BlockNumberFinality, ) (*EVMDownloader, error) { logger := log.WithFields("syncer", syncerID) finality, err := blockFinalityType.ToBlockNum() if err != nil { return nil, err } + topicsToQuery := make([]common.Hash, 0, len(appender)) for topic := range appender { topicsToQuery = append(topicsToQuery, topic) } + + fbtEthermanType := finalizedBlockType + fbt, err := finalizedBlockType.ToBlockNum() + if err != nil { + return nil, err + } + + if fbt.Cmp(finality) > 0 { + // if someone configured the syncer to query blocks by Safe or Finalized block + // finalized block type should be at least the same as the block finality + fbt = finality + fbtEthermanType = blockFinalityType + logger.Warnf("finalized block type %s is greater than block finality %s, setting finalized block type to %s", + finalizedBlockType, blockFinalityType, fbtEthermanType) + } + + logger.Infof("downloader initialized with block finality: %s, finalized block type: %s. SyncChunkSize: %d", + blockFinalityType, fbtEthermanType, syncBlockChunkSize) + return &EVMDownloader{ syncBlockChunkSize: syncBlockChunkSize, log: logger, + finalizedBlockType: fbtEthermanType, EVMDownloaderInterface: &EVMDownloaderImplementation{ ethClient: ethClient, blockFinality: finality, @@ -72,12 +96,14 @@ func NewEVMDownloader( adressessToQuery: adressessToQuery, rh: rh, log: logger, + finalizedBlockType: fbt, }, }, nil } func (d *EVMDownloader) Download(ctx context.Context, fromBlock uint64, downloadedCh chan EVMBlock) { lastBlock := d.WaitForNewBlocks(ctx, 0) + for { select { case <-ctx.Done(): @@ -86,37 +112,76 @@ func (d *EVMDownloader) Download(ctx context.Context, fromBlock uint64, download return default: } + toBlock := fromBlock + d.syncBlockChunkSize if toBlock > lastBlock { toBlock = lastBlock } + if fromBlock > toBlock { - d.log.Debugf( + d.log.Infof( "waiting for new blocks, last block processed: %d, last block seen on L1: %d", fromBlock-1, lastBlock, ) lastBlock = d.WaitForNewBlocks(ctx, fromBlock-1) continue } - d.log.Debugf("getting events from block %d to %d", fromBlock, toBlock) - blocks := d.GetEventsByBlockRange(ctx, fromBlock, toBlock) - for _, b := range blocks { - d.log.Debugf("sending block %d to the driver (with events)", b.Num) - downloadedCh <- b + + lastFinalizedBlock, err := d.GetLastFinalizedBlock(ctx) + if err != nil { + d.log.Error("error getting last finalized block: ", err) + continue } - if len(blocks) == 0 || blocks[len(blocks)-1].Num < toBlock { - // Indicate the last downloaded block if there are not events on it - d.log.Debugf("sending block %d to the driver (without events)", toBlock) - header, isCanceled := d.GetBlockHeader(ctx, toBlock) - if isCanceled { - return + + lastFinalizedBlockNumber := lastFinalizedBlock.Number.Uint64() + + d.log.Infof("getting events from blocks %d to %d. lastFinalizedBlock: %d", + fromBlock, toBlock, lastFinalizedBlockNumber) + blocks := d.GetEventsByBlockRange(ctx, fromBlock, toBlock) + + if toBlock <= lastFinalizedBlockNumber { + d.reportBlocks(downloadedCh, blocks, lastFinalizedBlockNumber) + fromBlock = toBlock + 1 + + if blocks.Len() == 0 || blocks[blocks.Len()-1].Num < toBlock { + d.reportEmptyBlock(ctx, downloadedCh, toBlock, lastFinalizedBlockNumber) } + } else { + d.reportBlocks(downloadedCh, blocks, lastFinalizedBlockNumber) - downloadedCh <- EVMBlock{ - EVMBlockHeader: header, + if blocks.Len() == 0 { + if lastFinalizedBlockNumber > fromBlock && + lastFinalizedBlockNumber-fromBlock > d.syncBlockChunkSize { + d.reportEmptyBlock(ctx, downloadedCh, fromBlock+d.syncBlockChunkSize, lastFinalizedBlockNumber) + fromBlock += d.syncBlockChunkSize + 1 + } + } else { + fromBlock = blocks[blocks.Len()-1].Num + 1 } } - fromBlock = toBlock + 1 + } +} + +func (d *EVMDownloader) reportBlocks(downloadedCh chan EVMBlock, blocks EVMBlocks, lastFinalizedBlock uint64) { + for _, block := range blocks { + d.log.Infof("sending block %d to the driver (with events)", block.Num) + block.IsFinalizedBlock = d.finalizedBlockType.IsFinalized() && block.Num <= lastFinalizedBlock + downloadedCh <- *block + } +} + +func (d *EVMDownloader) reportEmptyBlock(ctx context.Context, downloadedCh chan EVMBlock, + blockNum, lastFinalizedBlock uint64) { + // Indicate the last downloaded block if there are not events on it + d.log.Debugf("sending block %d to the driver (without events)", blockNum) + header, isCanceled := d.GetBlockHeader(ctx, blockNum) + if isCanceled { + return + } + + downloadedCh <- EVMBlock{ + IsFinalizedBlock: d.finalizedBlockType.IsFinalized() && header.Num <= lastFinalizedBlock, + EVMBlockHeader: header, } } @@ -129,6 +194,7 @@ type EVMDownloaderImplementation struct { adressessToQuery []common.Address rh *RetryHandler log *log.Logger + finalizedBlockType *big.Int } func NewEVMDownloaderImplementation( @@ -154,6 +220,10 @@ func NewEVMDownloaderImplementation( } } +func (d *EVMDownloaderImplementation) GetLastFinalizedBlock(ctx context.Context) (*types.Header, error) { + return d.ethClient.HeaderByNumber(ctx, d.finalizedBlockType) +} + func (d *EVMDownloaderImplementation) WaitForNewBlocks( ctx context.Context, lastBlockSeen uint64, ) (newLastBlock uint64) { @@ -184,12 +254,12 @@ func (d *EVMDownloaderImplementation) WaitForNewBlocks( } } -func (d *EVMDownloaderImplementation) GetEventsByBlockRange(ctx context.Context, fromBlock, toBlock uint64) []EVMBlock { +func (d *EVMDownloaderImplementation) GetEventsByBlockRange(ctx context.Context, fromBlock, toBlock uint64) EVMBlocks { select { case <-ctx.Done(): return nil default: - blocks := []EVMBlock{} + blocks := EVMBlocks{} logs := d.GetLogs(ctx, fromBlock, toBlock) for _, l := range logs { if len(blocks) == 0 || blocks[len(blocks)-1].Num < l.BlockNumber { @@ -206,7 +276,7 @@ func (d *EVMDownloaderImplementation) GetEventsByBlockRange(ctx context.Context, ) return d.GetEventsByBlockRange(ctx, fromBlock, toBlock) } - blocks = append(blocks, EVMBlock{ + blocks = append(blocks, &EVMBlock{ EVMBlockHeader: EVMBlockHeader{ Num: l.BlockNumber, Hash: l.BlockHash, @@ -219,7 +289,7 @@ func (d *EVMDownloaderImplementation) GetEventsByBlockRange(ctx context.Context, for { attempts := 0 - err := d.appender[l.Topics[0]](&blocks[len(blocks)-1], l) + err := d.appender[l.Topics[0]](blocks[len(blocks)-1], l) if err != nil { attempts++ d.log.Error("error trying to append log: ", err) diff --git a/sync/evmdownloader_test.go b/sync/evmdownloader_test.go index ef5a94cd..b4c600f1 100644 --- a/sync/evmdownloader_test.go +++ b/sync/evmdownloader_test.go @@ -35,7 +35,7 @@ func TestGetEventsByBlockRange(t *testing.T) { description string inputLogs []types.Log fromBlock, toBlock uint64 - expectedBlocks []EVMBlock + expectedBlocks EVMBlocks } testCases := []testCase{} ctx := context.Background() @@ -47,7 +47,7 @@ func TestGetEventsByBlockRange(t *testing.T) { inputLogs: []types.Log{}, fromBlock: 1, toBlock: 3, - expectedBlocks: []EVMBlock{}, + expectedBlocks: EVMBlocks{}, } testCases = append(testCases, case0) @@ -56,7 +56,7 @@ func TestGetEventsByBlockRange(t *testing.T) { logsC1 := []types.Log{ *logC1, } - blocksC1 := []EVMBlock{ + blocksC1 := EVMBlocks{ { EVMBlockHeader: EVMBlockHeader{ Num: logC1.BlockNumber, @@ -86,7 +86,7 @@ func TestGetEventsByBlockRange(t *testing.T) { *logC2_3, *logC2_4, } - blocksC2 := []EVMBlock{ + blocksC2 := []*EVMBlock{ { EVMBlockHeader: EVMBlockHeader{ Num: logC2_1.BlockNumber, @@ -121,7 +121,7 @@ func TestGetEventsByBlockRange(t *testing.T) { *logC3_3, *logC3_4, } - blocksC3 := []EVMBlock{ + blocksC3 := EVMBlocks{ { EVMBlockHeader: EVMBlockHeader{ Num: logC3_1.BlockNumber, @@ -206,114 +206,167 @@ func TestDownload(t *testing.T) { downloadCh := make(chan EVMBlock, 1) ctx := context.Background() ctx1, cancel := context.WithCancel(ctx) - expectedBlocks := []EVMBlock{} + expectedBlocks := EVMBlocks{} dwnldr, _ := NewTestDownloader(t, time.Millisecond*100) dwnldr.EVMDownloaderInterface = d d.On("WaitForNewBlocks", mock.Anything, uint64(0)). Return(uint64(1)) - // iteratiion 0: - // last block is 1, download that block (no events and wait) - b1 := EVMBlock{ - EVMBlockHeader: EVMBlockHeader{ - Num: 1, - Hash: common.HexToHash("01"), - }, + + lastFinalizedBlock := &types.Header{Number: big.NewInt(1)} + createEVMBlockFn := func(header *types.Header, isSafeBlock bool) *EVMBlock { + return &EVMBlock{ + IsFinalizedBlock: isSafeBlock, + EVMBlockHeader: EVMBlockHeader{ + Num: header.Number.Uint64(), + Hash: header.Hash(), + ParentHash: header.ParentHash, + Timestamp: header.Time, + }, + } } - expectedBlocks = append(expectedBlocks, b1) - d.On("GetEventsByBlockRange", mock.Anything, uint64(0), uint64(1)). - Return([]EVMBlock{}, false) - d.On("GetBlockHeader", mock.Anything, uint64(1)). - Return(b1.EVMBlockHeader, false) - // iteration 1: wait for next block to be created - d.On("WaitForNewBlocks", mock.Anything, uint64(1)). - After(time.Millisecond * 100). - Return(uint64(2)).Once() + // iteration 0: + // last block is 1, download that block (no events and wait) + b0 := createEVMBlockFn(lastFinalizedBlock, true) + expectedBlocks = append(expectedBlocks, b0) + d.On("GetLastFinalizedBlock", mock.Anything).Return(lastFinalizedBlock, nil).Once() + d.On("GetEventsByBlockRange", mock.Anything, uint64(0), uint64(1)). + Return(EVMBlocks{}, false).Once() + d.On("GetBlockHeader", mock.Anything, uint64(1)).Return(b0.EVMBlockHeader, false).Once() - // iteration 2: block 2 has events - b2 := EVMBlock{ - EVMBlockHeader: EVMBlockHeader{ - Num: 2, - Hash: common.HexToHash("02"), - }, - } + // iteration 1: we have a new block, so increase to block (no events) + lastFinalizedBlock = &types.Header{Number: big.NewInt(2)} + b2 := createEVMBlockFn(lastFinalizedBlock, true) expectedBlocks = append(expectedBlocks, b2) + d.On("WaitForNewBlocks", mock.Anything, uint64(1)). + Return(uint64(2)) + d.On("GetLastFinalizedBlock", mock.Anything).Return(lastFinalizedBlock, nil).Once() d.On("GetEventsByBlockRange", mock.Anything, uint64(2), uint64(2)). - Return([]EVMBlock{b2}, false) + Return(EVMBlocks{}, false).Once() + d.On("GetBlockHeader", mock.Anything, uint64(2)).Return(b2.EVMBlockHeader, false).Once() - // iteration 3: wait for next block to be created (jump to block 8) + // iteration 2: wait for next block to be created (jump to block 8) d.On("WaitForNewBlocks", mock.Anything, uint64(2)). After(time.Millisecond * 100). Return(uint64(8)).Once() - // iteration 4: blocks 6 and 7 have events - b6 := EVMBlock{ + // iteration 3: blocks 6 and 7 have events, last finalized block is 5 + lastFinalizedBlock = &types.Header{Number: big.NewInt(5)} + b6 := &EVMBlock{ EVMBlockHeader: EVMBlockHeader{ Num: 6, Hash: common.HexToHash("06"), }, Events: []interface{}{"06"}, } - b7 := EVMBlock{ + b7 := &EVMBlock{ EVMBlockHeader: EVMBlockHeader{ Num: 7, Hash: common.HexToHash("07"), }, Events: []interface{}{"07"}, } - b8 := EVMBlock{ - EVMBlockHeader: EVMBlockHeader{ - Num: 8, - Hash: common.HexToHash("08"), - }, - } - expectedBlocks = append(expectedBlocks, b6, b7, b8) + expectedBlocks = append(expectedBlocks, b6, b7) + d.On("GetLastFinalizedBlock", mock.Anything).Return(lastFinalizedBlock, nil).Once() d.On("GetEventsByBlockRange", mock.Anything, uint64(3), uint64(8)). - Return([]EVMBlock{b6, b7}, false) - d.On("GetBlockHeader", mock.Anything, uint64(8)). - Return(b8.EVMBlockHeader, false) - - // iteration 5: wait for next block to be created (jump to block 30) + Return(EVMBlocks{b6, b7}, false) + + // iteration 4: finalized block is now block 8, report the finalized block + lastFinalizedBlock = &types.Header{Number: big.NewInt(8)} + b8 := createEVMBlockFn(lastFinalizedBlock, true) + expectedBlocks = append(expectedBlocks, b8) + d.On("GetLastFinalizedBlock", mock.Anything).Return(lastFinalizedBlock, nil).Once() + d.On("GetEventsByBlockRange", mock.Anything, uint64(8), uint64(8)). + Return(EVMBlocks{}, false) + d.On("GetBlockHeader", mock.Anything, uint64(8)).Return(b8.EVMBlockHeader, false).Once() + + // iteration 5: from block 9 to 19, no events + lastFinalizedBlock = &types.Header{Number: big.NewInt(15)} d.On("WaitForNewBlocks", mock.Anything, uint64(8)). After(time.Millisecond * 100). - Return(uint64(30)).Once() + Return(uint64(19)).Once() + d.On("GetLastFinalizedBlock", mock.Anything).Return(lastFinalizedBlock, nil).Once() + d.On("GetEventsByBlockRange", mock.Anything, uint64(9), uint64(19)). + Return(EVMBlocks{}, false) - // iteration 6: from block 9 to 19, no events - b19 := EVMBlock{ - EVMBlockHeader: EVMBlockHeader{ - Num: 19, - Hash: common.HexToHash("19"), - }, - } - expectedBlocks = append(expectedBlocks, b19) + // iteration 6: last finalized block is now 20, no events, report empty block + d.On("GetLastFinalizedBlock", mock.Anything).Return(&types.Header{Number: big.NewInt(20)}, nil).Once() d.On("GetEventsByBlockRange", mock.Anything, uint64(9), uint64(19)). - Return([]EVMBlock{}, false) - d.On("GetBlockHeader", mock.Anything, uint64(19)). - Return(b19.EVMBlockHeader, false) + Return(EVMBlocks{}, false) - // iteration 7: from block 20 to 30, events on last block - b30 := EVMBlock{ + d.On("WaitForNewBlocks", mock.Anything, uint64(19)). + After(time.Millisecond * 100). + Return(uint64(20)).Once() + b19 := createEVMBlockFn(&types.Header{Number: big.NewInt(19)}, true) + expectedBlocks = append(expectedBlocks, b19) + d.On("GetBlockHeader", mock.Anything, uint64(19)).Return(b19.EVMBlockHeader, false) // reporting empty finalized to block + + // iteration 8: last finalized block is 21, no events + b20 := createEVMBlockFn(&types.Header{Number: big.NewInt(20)}, true) + expectedBlocks = append(expectedBlocks, b20) + d.On("GetLastFinalizedBlock", mock.Anything).Return(&types.Header{Number: big.NewInt(21)}, nil).Once() + d.On("GetEventsByBlockRange", mock.Anything, uint64(20), uint64(20)). + Return(EVMBlocks{}, false) + d.On("GetBlockHeader", mock.Anything, uint64(20)).Return(b20.EVMBlockHeader, false) // reporting empty finalized to block + + // iteration 9: last finalized block is 22, no events + d.On("WaitForNewBlocks", mock.Anything, uint64(20)). + After(time.Millisecond * 100). + Return(uint64(21)).Once() + b21 := createEVMBlockFn(&types.Header{Number: big.NewInt(21)}, true) + expectedBlocks = append(expectedBlocks, b21) + d.On("GetLastFinalizedBlock", mock.Anything).Return(&types.Header{Number: big.NewInt(22)}, nil).Once() + d.On("GetEventsByBlockRange", mock.Anything, uint64(21), uint64(21)). + Return(EVMBlocks{}, false) + d.On("GetBlockHeader", mock.Anything, uint64(21)).Return(b21.EVMBlockHeader, false) // reporting empty finalized to block + + // iteration 10: last finalized block is 23, no events + d.On("WaitForNewBlocks", mock.Anything, uint64(21)). + After(time.Millisecond * 100). + Return(uint64(22)).Once() + b22 := createEVMBlockFn(&types.Header{Number: big.NewInt(22)}, true) + expectedBlocks = append(expectedBlocks, b22) + d.On("GetLastFinalizedBlock", mock.Anything).Return(&types.Header{Number: big.NewInt(23)}, nil).Once() + d.On("GetEventsByBlockRange", mock.Anything, uint64(22), uint64(22)). + Return(EVMBlocks{}, false) + d.On("GetBlockHeader", mock.Anything, uint64(22)).Return(b22.EVMBlockHeader, false) // reporting empty finalized to block + + // iteration 11: last finalized block is still 23, no events + d.On("WaitForNewBlocks", mock.Anything, uint64(22)). + After(time.Millisecond * 100). + Return(uint64(23)).Once() + b23 := createEVMBlockFn(&types.Header{Number: big.NewInt(23)}, true) + expectedBlocks = append(expectedBlocks, b23) + d.On("GetLastFinalizedBlock", mock.Anything).Return(&types.Header{Number: big.NewInt(23)}, nil).Once() + d.On("GetEventsByBlockRange", mock.Anything, uint64(23), uint64(23)). + Return(EVMBlocks{}, false) + d.On("GetBlockHeader", mock.Anything, uint64(23)).Return(b23.EVMBlockHeader, false) // reporting empty finalized to block + + // iteration 12: finalized block is 24, has events + d.On("WaitForNewBlocks", mock.Anything, uint64(23)). + After(time.Millisecond * 100). + Return(uint64(24)).Once() + b24 := &EVMBlock{ EVMBlockHeader: EVMBlockHeader{ - Num: 30, - Hash: common.HexToHash("30"), + Num: 24, + Hash: common.HexToHash("24"), }, - Events: []interface{}{testEvent(common.HexToHash("30"))}, + Events: []interface{}{testEvent(common.HexToHash("24"))}, } - expectedBlocks = append(expectedBlocks, b30) - d.On("GetEventsByBlockRange", mock.Anything, uint64(20), uint64(30)). - Return([]EVMBlock{b30}, false) + expectedBlocks = append(expectedBlocks, b24) + d.On("GetLastFinalizedBlock", mock.Anything).Return(&types.Header{Number: big.NewInt(24)}, nil).Once() + d.On("GetEventsByBlockRange", mock.Anything, uint64(24), uint64(24)). + Return(EVMBlocks{b24}, false) - // iteration 8: wait for next block to be created (jump to block 35) - d.On("WaitForNewBlocks", mock.Anything, uint64(30)). - After(time.Millisecond * 100). - Return(uint64(35)).Once() + // iteration 13: closing the downloader + d.On("WaitForNewBlocks", mock.Anything, uint64(24)).Return(uint64(25)).After(time.Millisecond * 100).Once() go dwnldr.Download(ctx1, 0, downloadCh) for _, expectedBlock := range expectedBlocks { actualBlock := <-downloadCh log.Debugf("block %d received!", actualBlock.Num) - require.Equal(t, expectedBlock, actualBlock) + require.Equal(t, *expectedBlock, actualBlock) } log.Debug("canceling") cancel() @@ -451,7 +504,11 @@ func NewTestDownloader(t *testing.T, retryPeriod time.Duration) (*EVMDownloader, RetryAfterErrorPeriod: retryPeriod, } clientMock := NewL2Mock(t) - d, err := NewEVMDownloader("test", clientMock, syncBlockChunck, etherman.LatestBlock, time.Millisecond, buildAppender(), []common.Address{contractAddr}, rh) + d, err := NewEVMDownloader("test", + clientMock, syncBlockChunck, etherman.LatestBlock, time.Millisecond, + buildAppender(), []common.Address{contractAddr}, rh, + etherman.FinalizedBlock, + ) require.NoError(t, err) return d, clientMock } diff --git a/sync/evmdriver.go b/sync/evmdriver.go index b8e706b9..7b7dba74 100644 --- a/sync/evmdriver.go +++ b/sync/evmdriver.go @@ -9,11 +9,6 @@ import ( "github.com/ethereum/go-ethereum/common" ) -type evmDownloaderFull interface { - EVMDownloaderInterface - downloader -} - type downloader interface { Download(ctx context.Context, fromBlock uint64, downloadedCh chan EVMBlock) } @@ -97,9 +92,13 @@ reset: d.log.Info("sync stopped due to context done") cancel() return - case b := <-downloadCh: - d.log.Debugf("handleNewBlock, blockNum: %d, blockHash: %s", b.Num, b.Hash) - d.handleNewBlock(ctx, cancel, b) + case b, ok := <-downloadCh: + if ok { + // when channel is closing, it is sending an empty block with num = 0, and empty hash + // because it is not passing object by reference, but by value, so do not handle that since it is closing + d.log.Infof("handleNewBlock, blockNum: %d, blockHash: %s", b.Num, b.Hash) + d.handleNewBlock(ctx, cancel, b) + } case firstReorgedBlock := <-d.reorgSub.ReorgedBlock: d.log.Debug("handleReorg from block: ", firstReorgedBlock) d.handleReorg(ctx, cancel, firstReorgedBlock) @@ -118,11 +117,15 @@ func (d *EVMDriver) handleNewBlock(ctx context.Context, cancel context.CancelFun d.log.Warnf("context canceled while adding block %d to tracker", b.Num) return default: - err := d.reorgDetector.AddBlockToTrack(ctx, d.reorgDetectorID, b.Num, b.Hash) - if err != nil { - attempts++ - d.log.Errorf("error adding block %d to tracker: %v", b.Num, err) - d.rh.Handle("handleNewBlock", attempts) + if !b.IsFinalizedBlock { + err := d.reorgDetector.AddBlockToTrack(ctx, d.reorgDetectorID, b.Num, b.Hash) + if err != nil { + attempts++ + d.log.Errorf("error adding block %d to tracker: %v", b.Num, err) + d.rh.Handle("handleNewBlock", attempts) + } else { + succeed = true + } } else { succeed = true } diff --git a/sync/evmtypes.go b/sync/evmtypes.go index d242dbc4..739154f9 100644 --- a/sync/evmtypes.go +++ b/sync/evmtypes.go @@ -2,9 +2,16 @@ package sync import "github.com/ethereum/go-ethereum/common" +type EVMBlocks []*EVMBlock + +func (e EVMBlocks) Len() int { + return len(e) +} + type EVMBlock struct { EVMBlockHeader - Events []interface{} + IsFinalizedBlock bool + Events []interface{} } type EVMBlockHeader struct { diff --git a/sync/mock_downloader_test.go b/sync/mock_downloader_test.go index 662f49f7..43fed1ed 100644 --- a/sync/mock_downloader_test.go +++ b/sync/mock_downloader_test.go @@ -115,53 +115,53 @@ func (_c *EVMDownloaderMock_GetBlockHeader_Call) RunAndReturn(run func(context.C } // GetEventsByBlockRange provides a mock function with given fields: ctx, fromBlock, toBlock -func (_m *EVMDownloaderMock) GetEventsByBlockRange(ctx context.Context, fromBlock uint64, toBlock uint64) []EVMBlock { +func (_m *EVMDownloaderMock) GetEventsByBlockRange(ctx context.Context, fromBlock uint64, toBlock uint64) EVMBlocks { ret := _m.Called(ctx, fromBlock, toBlock) if len(ret) == 0 { panic("no return value specified for GetEventsByBlockRange") } - var r0 []EVMBlock - if rf, ok := ret.Get(0).(func(context.Context, uint64, uint64) []EVMBlock); ok { + var r0 EVMBlocks + if rf, ok := ret.Get(0).(func(context.Context, uint64, uint64) EVMBlocks); ok { r0 = rf(ctx, fromBlock, toBlock) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).([]EVMBlock) + r0 = ret.Get(0).(EVMBlocks) } } return r0 } -// EVMDownloaderMock_GetEventsByBlockRange_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetEventsByBlockRange' -type EVMDownloaderMock_GetEventsByBlockRange_Call struct { - *mock.Call -} +// GetLastFinalizedBlock provides a mock function with given fields: ctx +func (_m *EVMDownloaderMock) GetLastFinalizedBlock(ctx context.Context) (*types.Header, error) { + ret := _m.Called(ctx) -// GetEventsByBlockRange is a helper method to define mock.On call -// - ctx context.Context -// - fromBlock uint64 -// - toBlock uint64 -func (_e *EVMDownloaderMock_Expecter) GetEventsByBlockRange(ctx interface{}, fromBlock interface{}, toBlock interface{}) *EVMDownloaderMock_GetEventsByBlockRange_Call { - return &EVMDownloaderMock_GetEventsByBlockRange_Call{Call: _e.mock.On("GetEventsByBlockRange", ctx, fromBlock, toBlock)} -} + if len(ret) == 0 { + panic("no return value specified for GetLastFinalizedBlock") + } -func (_c *EVMDownloaderMock_GetEventsByBlockRange_Call) Run(run func(ctx context.Context, fromBlock uint64, toBlock uint64)) *EVMDownloaderMock_GetEventsByBlockRange_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(uint64), args[2].(uint64)) - }) - return _c -} + var r0 *types.Header + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (*types.Header, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) *types.Header); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*types.Header) + } + } -func (_c *EVMDownloaderMock_GetEventsByBlockRange_Call) Return(_a0 []EVMBlock) *EVMDownloaderMock_GetEventsByBlockRange_Call { - _c.Call.Return(_a0) - return _c -} + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } -func (_c *EVMDownloaderMock_GetEventsByBlockRange_Call) RunAndReturn(run func(context.Context, uint64, uint64) []EVMBlock) *EVMDownloaderMock_GetEventsByBlockRange_Call { - _c.Call.Return(run) - return _c + return r0, r1 } // GetLogs provides a mock function with given fields: ctx, fromBlock, toBlock diff --git a/test/helpers/e2e.go b/test/helpers/e2e.go index c7fb908e..d8c32546 100644 --- a/test/helpers/e2e.go +++ b/test/helpers/e2e.go @@ -13,6 +13,7 @@ import ( "github.com/0xPolygon/cdk/aggoracle" "github.com/0xPolygon/cdk/aggoracle/chaingersender" "github.com/0xPolygon/cdk/bridgesync" + cfgTypes "github.com/0xPolygon/cdk/config/types" "github.com/0xPolygon/cdk/etherman" "github.com/0xPolygon/cdk/l1infotreesync" "github.com/0xPolygon/cdk/log" @@ -104,7 +105,10 @@ func L1Setup(t *testing.T) *L1Environment { // Reorg detector dbPathReorgDetectorL1 := path.Join(t.TempDir(), "ReorgDetectorL1.sqlite") - rdL1, err := reorgdetector.New(l1Client.Client(), reorgdetector.Config{DBPath: dbPathReorgDetectorL1}) + rdL1, err := reorgdetector.New(l1Client.Client(), reorgdetector.Config{ + DBPath: dbPathReorgDetectorL1, + CheckReorgsInterval: cfgTypes.Duration{Duration: time.Millisecond * 100}, //nolint:mnd + }) require.NoError(t, err) go rdL1.Start(ctx) //nolint:errcheck @@ -117,6 +121,7 @@ func L1Setup(t *testing.T) *L1Environment { rdL1, l1Client.Client(), time.Millisecond, 0, periodRetry, retries, l1infotreesync.FlagAllowWrongContractsAddrs, + etherman.SafeBlock, ) require.NoError(t, err) @@ -128,7 +133,7 @@ func L1Setup(t *testing.T) *L1Environment { originNetwork = 1 initialBlock = 0 retryPeriod = 0 - retriesCount = 0 + retriesCount = 10 ) // Bridge sync @@ -138,7 +143,7 @@ func L1Setup(t *testing.T) *L1Environment { ctx, dbPathBridgeSyncL1, bridgeL1Addr, syncBlockChunks, etherman.LatestBlock, rdL1, testClient, initialBlock, waitForNewBlocksPeriod, retryPeriod, - retriesCount, originNetwork, false) + retriesCount, originNetwork, false, etherman.SafeBlock) require.NoError(t, err) go bridgeL1Sync.Start(ctx) @@ -177,7 +182,9 @@ func L2Setup(t *testing.T) *L2Environment { // Reorg detector dbPathReorgL2 := path.Join(t.TempDir(), "ReorgDetectorL2.sqlite") - rdL2, err := reorgdetector.New(l2Client.Client(), reorgdetector.Config{DBPath: dbPathReorgL2}) + rdL2, err := reorgdetector.New(l2Client.Client(), reorgdetector.Config{ + DBPath: dbPathReorgL2, + CheckReorgsInterval: cfgTypes.Duration{Duration: time.Millisecond * 100}}) //nolint:mnd require.NoError(t, err) go rdL2.Start(ctx) //nolint:errcheck @@ -198,7 +205,7 @@ func L2Setup(t *testing.T) *L2Environment { ctx, dbPathL2BridgeSync, bridgeL2Addr, syncBlockChunks, etherman.LatestBlock, rdL2, testClient, initialBlock, waitForNewBlocksPeriod, retryPeriod, - retriesCount, originNetwork, false) + retriesCount, originNetwork, false, etherman.LatestBlock) require.NoError(t, err) go bridgeL2Sync.Start(ctx) diff --git a/test/helpers/wait.go b/test/helpers/wait.go index 86a6f9fb..54715ecd 100644 --- a/test/helpers/wait.go +++ b/test/helpers/wait.go @@ -2,7 +2,7 @@ package helpers import ( "context" - "errors" + "fmt" "testing" "time" @@ -19,14 +19,18 @@ func RequireProcessorUpdated(t *testing.T, processor Processorer, targetBlock ui maxIterations = 100 sleepTimePerIteration = time.Millisecond * 10 ) + var ( + lpb uint64 + err error + ) ctx := context.Background() for i := 0; i < maxIterations; i++ { - lpb, err := processor.GetLastProcessedBlock(ctx) + lpb, err = processor.GetLastProcessedBlock(ctx) require.NoError(t, err) if targetBlock <= lpb { return } time.Sleep(sleepTimePerIteration) } - require.NoError(t, errors.New("processor not updated")) + require.NoError(t, fmt.Errorf("processor not updated. Last block: %d, target block: %d", lpb, targetBlock)) }