Skip to content

Commit

Permalink
fix: driver handling closing download channel
Browse files Browse the repository at this point in the history
  • Loading branch information
goran-ethernal committed Jan 20, 2025
1 parent 00b33d0 commit 55df143
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 21 deletions.
12 changes: 5 additions & 7 deletions l1infotreesync/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestE2E(t *testing.T) {
rdm.On("AddBlockToTrack", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)

client, auth, gerAddr, verifyAddr, gerSc, verifySC := newSimulatedClient(t)
syncer, err := l1infotreesync.New(ctx, dbPath, gerAddr, verifyAddr, 10, etherman.LatestBlock, rdm, client.Client(), time.Millisecond, 0, 100*time.Millisecond, 3,
syncer, err := l1infotreesync.New(ctx, dbPath, gerAddr, verifyAddr, 10, etherman.LatestBlock, rdm, client.Client(), time.Millisecond, 0, 100*time.Millisecond, 25,
l1infotreesync.FlagAllowWrongContractsAddrs)
require.NoError(t, err)

Expand Down Expand Up @@ -228,9 +228,6 @@ func TestWithReorgs(t *testing.T) {
// Block 4, 5, 6 after the fork
commitBlocks(t, client, 3, time.Millisecond*500)

// Make sure syncer is up to date
waitForSyncerToCatchUp(ctx, t, syncer, client)

// Assert rollup exit root after the fork - should be zero since there are no events in the block after the fork
expectedRollupExitRoot, err = verifySC.GetRollupExitRoot(&bind.CallOpts{Pending: false})
require.NoError(t, err)
Expand All @@ -244,11 +241,12 @@ func TestWithReorgs(t *testing.T) {
require.NoError(t, err)
time.Sleep(time.Millisecond * 500)

commitBlocks(t, client, 1, time.Millisecond*100)

// create some events and update the trees
updateL1InfoTreeAndRollupExitTree(2, 1)

// Block 4, 5, 6, 7 after the fork
commitBlocks(t, client, 4, time.Millisecond*100)
commitBlocks(t, client, 1, time.Millisecond*100)

// Make sure syncer is up to date
waitForSyncerToCatchUp(ctx, t, syncer, client)
Expand Down Expand Up @@ -323,7 +321,7 @@ func TestStressAndReorgs(t *testing.T) {
}
}

commitBlocks(t, client, 1, time.Millisecond*10)
commitBlocks(t, client, 11, time.Millisecond*10)

waitForSyncerToCatchUp(ctx, t, syncer, client)

Expand Down
21 changes: 10 additions & 11 deletions sync/evmdownloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func NewEVMDownloader(
func (d *EVMDownloader) Download(ctx context.Context, fromBlock uint64, downloadedCh chan EVMBlock) {
lastBlock := d.WaitForNewBlocks(ctx, 0)
toBlock := fromBlock
automaticToBlockResize := true
//automaticToBlockResize := true

Check failure on line 79 in sync/evmdownloader.go

View workflow job for this annotation

GitHub Actions / lint

commentFormatting: put a space between `//` and comment text (gocritic)

for {
select {
Expand All @@ -87,14 +87,14 @@ func (d *EVMDownloader) Download(ctx context.Context, fromBlock uint64, download
default:
}

if automaticToBlockResize {
toBlock = fromBlock + d.syncBlockChunkSize
if toBlock > lastBlock {
toBlock = lastBlock
}
//if automaticToBlockResize {

Check failure on line 90 in sync/evmdownloader.go

View workflow job for this annotation

GitHub Actions / lint

commentFormatting: put a space between `//` and comment text (gocritic)
toBlock = fromBlock + d.syncBlockChunkSize
if toBlock > lastBlock {
toBlock = lastBlock
}
//}

automaticToBlockResize = true // reset the flag
//automaticToBlockResize = true // reset the flag

Check failure on line 97 in sync/evmdownloader.go

View workflow job for this annotation

GitHub Actions / lint

commentFormatting: put a space between `//` and comment text (gocritic)

if fromBlock > toBlock {
d.log.Debugf(
Expand Down Expand Up @@ -141,8 +141,6 @@ func (d *EVMDownloader) Download(ctx context.Context, fromBlock uint64, download
if lastFinalizedBlockNumber > toBlock {
// we might be behind a lot, so go until last finalized block
toBlock = lastFinalizedBlockNumber + 1
} else {
toBlock++
}

if lastFinalizedBlockNumber-fromBlock >= d.syncBlockChunkSize {
Expand All @@ -153,7 +151,7 @@ func (d *EVMDownloader) Download(ctx context.Context, fromBlock uint64, download
fromBlock = lastFinalizedBlockNumber
}

automaticToBlockResize = false
//automaticToBlockResize = false

continue
} else if blocks[blocks.Len()-1].Num <= lastFinalizedBlockNumber {
Expand All @@ -162,10 +160,11 @@ func (d *EVMDownloader) Download(ctx context.Context, fromBlock uint64, download
// and we do not need to track it in the reorg detector
reportBlocksFn(blocks.Len())
fromBlock = lastFinalizedBlockNumber + 1
} else if blocks[len(blocks)-1].Num < toBlock {
} else if blocks[blocks.Len()-1].Num < toBlock {
// if we have logs in some of the blocks, and they are not all finalized,
// check if we have finalized blocks in gotten range, report them and
// set the from block from the last finalized block and keep increasing the range
// if not keep getting that range to protect us from possible mishandling of block hashes
lastFinalizedBlock, index, exists := blocks.LastFinalizedBlock(lastFinalizedBlockNumber)
if exists {
reportBlocksFn(index + 1) // num of blocks to report is index + 1 since index is zero based
Expand Down
10 changes: 7 additions & 3 deletions sync/evmdriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ reset:
cancellableCtx, cancel := context.WithCancel(ctx)
defer cancel()

log.Info("Starting sync...", " lastProcessedBlock", lastProcessedBlock)
log.Info("Starting sync...", " lastProcessedBlock ", lastProcessedBlock)
// start downloading
downloadCh := make(chan EVMBlock, d.downloadBufferSize)
go d.downloader.Download(cancellableCtx, lastProcessedBlock+1, downloadCh)
Expand All @@ -92,9 +92,13 @@ reset:
d.log.Info("sync stopped due to context done")
cancel()
return
case b := <-downloadCh:
case b, ok := <-downloadCh:
d.log.Debugf("handleNewBlock, blockNum: %d, blockHash: %s", b.Num, b.Hash)
d.handleNewBlock(ctx, cancel, b)
if ok {
// when channel is closing, it is sending an empty block with num = 0, and empty hash
// because it is not passing object by reference, but by value, so do not handle that since it is closing
d.handleNewBlock(ctx, cancel, b)
}
case firstReorgedBlock := <-d.reorgSub.ReorgedBlock:
d.log.Info("handleReorg from block: ", firstReorgedBlock)
d.handleReorg(ctx, cancel, firstReorgedBlock)
Expand Down

0 comments on commit 55df143

Please sign in to comment.