Skip to content

Commit

Permalink
fix: new logic
Browse files Browse the repository at this point in the history
  • Loading branch information
goran-ethernal committed Jan 21, 2025
1 parent b232918 commit 3137fa6
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 79 deletions.
57 changes: 15 additions & 42 deletions sync/evmdownloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ func NewEVMDownloader(

func (d *EVMDownloader) Download(ctx context.Context, fromBlock uint64, downloadedCh chan EVMBlock) {
lastBlock := d.WaitForNewBlocks(ctx, 0)
toBlock := fromBlock

for {
select {
Expand All @@ -86,14 +85,14 @@ func (d *EVMDownloader) Download(ctx context.Context, fromBlock uint64, download
default:
}

toBlock = fromBlock + d.syncBlockChunkSize
toBlock := fromBlock + d.syncBlockChunkSize
if toBlock > lastBlock {
toBlock = lastBlock
}

if fromBlock > toBlock {
d.log.Debugf(
"waiting for new blocks, last block processed %d, last block seen on L1 %d",
d.log.Infof(
"waiting for new blocks, last block processed: %d, last block seen on L1: %d",
fromBlock-1, lastBlock,
)
lastBlock = d.WaitForNewBlocks(ctx, fromBlock-1)
Expand All @@ -108,12 +107,13 @@ func (d *EVMDownloader) Download(ctx context.Context, fromBlock uint64, download

lastFinalizedBlockNumber := lastFinalizedBlock.Number.Uint64()

d.log.Debugf("getting events from blocks %d to %d", fromBlock, toBlock)
d.log.Infof("getting events from blocks %d to %d. lastFinalizedBlock: %d",
fromBlock, toBlock, lastFinalizedBlockNumber)
blocks := d.GetEventsByBlockRange(ctx, fromBlock, toBlock)

reportBlocksFn := func(numOfBlocksToReport int) {
for i := 0; i < numOfBlocksToReport; i++ {
d.log.Debugf("sending block %d to the driver (with events)", blocks[i].Num)
d.log.Infof("sending block %d to the driver (with events)", blocks[i].Num)
downloadedCh <- blocks[i]
}
}
Expand All @@ -131,46 +131,19 @@ func (d *EVMDownloader) Download(ctx context.Context, fromBlock uint64, download
}
}

if blocks.Len() == 0 {
// we have no events, keep increasing the block range until we hit a log
if lastFinalizedBlockNumber > toBlock {
// we might be behind a lot, so go until last finalized block
toBlock = lastFinalizedBlockNumber
lastBlock = lastFinalizedBlockNumber
}

if lastFinalizedBlockNumber-fromBlock >= d.syncBlockChunkSize {
// if we already got a lot of finalized blocks that are empty, report an empty block
// to the driver to indicate that we are still processing the chain
// this is mainly needed for tests
reportEmptyBlockFn(lastFinalizedBlockNumber)
fromBlock = lastFinalizedBlockNumber
}

continue
} else if blocks[blocks.Len()-1].Num <= lastFinalizedBlockNumber {
// if the last block we have logs for is less than or equal to the last finalized block,
// report all of the blocks without the need to report the last empty block, since it is finalized
// and we do not need to track it in the reorg detector
if toBlock <= lastFinalizedBlockNumber {
reportBlocksFn(blocks.Len())
fromBlock = toBlock + 1
} else if blocks[blocks.Len()-1].Num < toBlock {
// if we have logs in some of the blocks, and they are not all finalized,
// check if we have finalized blocks in gotten range, report them and
// set the from block from the last finalized block and keep increasing the range
// if not keep getting that range to protect us from possible mishandling of block hashes
lastFinalizedBlock, index, exists := blocks.LastFinalizedBlock(lastFinalizedBlockNumber)
if exists {
reportBlocksFn(index + 1) // num of blocks to report is index + 1 since index is zero based
fromBlock = lastFinalizedBlock + 1
continue

if blocks.Len() == 0 || blocks[blocks.Len()-1].Num < toBlock {
reportEmptyBlockFn(toBlock)
}
} else {
// if we have logs in the last block, just report all of them and continue
// reorg detector will handle the reorg since the last block has events,
// and we are not afraid to have missaligned hashes at this point
reportBlocksFn(blocks.Len())
fromBlock = toBlock + 1
lastFinalizedBlockNum, i, found := blocks.LastFinalizedBlock(lastFinalizedBlockNumber)
if found {
reportBlocksFn(i + 1)
fromBlock = lastFinalizedBlockNum + 1
}
}
}
}
Expand Down
111 changes: 74 additions & 37 deletions sync/evmdownloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,35 +213,45 @@ func TestDownload(t *testing.T) {
d.On("WaitForNewBlocks", mock.Anything, uint64(0)).
Return(uint64(1))

lastFinalizedBlock := &types.Header{Number: big.NewInt(1)}
createEVMBlockFn := func(header *types.Header) EVMBlock {
return EVMBlock{
EVMBlockHeader: EVMBlockHeader{
Num: header.Number.Uint64(),
Hash: header.Hash(),
ParentHash: header.ParentHash,
Timestamp: header.Time,
},
}
}

// iteration 0:
// last block is 1, download that block (no events and wait)
d.On("GetLastFinalizedBlock", mock.Anything).Return(&types.Header{Number: big.NewInt(1)}, nil).Once()
b0 := createEVMBlockFn(lastFinalizedBlock)
expectedBlocks = append(expectedBlocks, b0)
d.On("GetLastFinalizedBlock", mock.Anything).Return(lastFinalizedBlock, nil).Once()
d.On("GetEventsByBlockRange", mock.Anything, uint64(0), uint64(1)).
Return(EVMBlocks{}, false).Once()
d.On("GetBlockHeader", mock.Anything, uint64(1)).Return(b0.EVMBlockHeader, false).Once()

// iteration 1: we have a new block, so increase to block
d.On("GetLastFinalizedBlock", mock.Anything).Return(&types.Header{Number: big.NewInt(2)}, nil).Once()
d.On("GetEventsByBlockRange", mock.Anything, uint64(0), uint64(1)).
Return(EVMBlocks{}, false).Once()

// iteration 2: block 2 has events
b2 := EVMBlock{
EVMBlockHeader: EVMBlockHeader{
Num: 2,
Hash: common.HexToHash("02"),
},
}
// iteration 1: we have a new block, so increase to block (no events)
lastFinalizedBlock = &types.Header{Number: big.NewInt(2)}
b2 := createEVMBlockFn(lastFinalizedBlock)
expectedBlocks = append(expectedBlocks, b2)
d.On("GetLastFinalizedBlock", mock.Anything).Return(&types.Header{Number: big.NewInt(2)}, nil).Once()
d.On("GetEventsByBlockRange", mock.Anything, uint64(0), uint64(2)).
Return(EVMBlocks{b2}, false).Once()
d.On("WaitForNewBlocks", mock.Anything, uint64(1)).
Return(uint64(2))
d.On("GetLastFinalizedBlock", mock.Anything).Return(lastFinalizedBlock, nil).Once()
d.On("GetEventsByBlockRange", mock.Anything, uint64(2), uint64(2)).
Return(EVMBlocks{}, false).Once()
d.On("GetBlockHeader", mock.Anything, uint64(2)).Return(b2.EVMBlockHeader, false).Once()

// iteration 3: wait for next block to be created (jump to block 8)
// iteration 2: wait for next block to be created (jump to block 8)
d.On("WaitForNewBlocks", mock.Anything, uint64(2)).
After(time.Millisecond * 100).
Return(uint64(8)).Once()

// iteration 4: blocks 6 and 7 have events, but last finalized block is 5
// iteration 3: blocks 6 and 7 have events, but last finalized block is 5
lastFinalizedBlock = &types.Header{Number: big.NewInt(5)}
b6 := EVMBlock{
EVMBlockHeader: EVMBlockHeader{
Num: 6,
Expand All @@ -257,58 +267,85 @@ func TestDownload(t *testing.T) {
Events: []interface{}{"07"},
}
expectedBlocks = append(expectedBlocks, b6, b7)
d.On("GetLastFinalizedBlock", mock.Anything).Return(&types.Header{Number: big.NewInt(5)}, nil).Once()
d.On("GetLastFinalizedBlock", mock.Anything).Return(lastFinalizedBlock, nil).Once()
d.On("GetEventsByBlockRange", mock.Anything, uint64(3), uint64(8)).
Return(EVMBlocks{b6, b7}, false)

// iteration 5: finalized block is now block 8, we report events b6 and b7
d.On("GetLastFinalizedBlock", mock.Anything).Return(&types.Header{Number: big.NewInt(8)}, nil).Once()
// iteration 4: finalized block is now block 8, we report events b6 and b7
lastFinalizedBlock = &types.Header{Number: big.NewInt(8)}
b8 := createEVMBlockFn(lastFinalizedBlock)
expectedBlocks = append(expectedBlocks, b8)
d.On("GetLastFinalizedBlock", mock.Anything).Return(lastFinalizedBlock, nil).Once()
d.On("GetEventsByBlockRange", mock.Anything, uint64(3), uint64(8)).
Return(EVMBlocks{b6, b7}, false)
d.On("GetBlockHeader", mock.Anything, uint64(8)).Return(b8.EVMBlockHeader, false).Once()

// iteration 6: from block 9 to 19, no events
// iteration 5: from block 9 to 19, no events
lastFinalizedBlock = &types.Header{Number: big.NewInt(15)}
d.On("WaitForNewBlocks", mock.Anything, uint64(8)).
After(time.Millisecond * 100).
Return(uint64(19)).Once()
d.On("GetLastFinalizedBlock", mock.Anything).Return(&types.Header{Number: big.NewInt(15)}, nil).Once()
d.On("GetLastFinalizedBlock", mock.Anything).Return(lastFinalizedBlock, nil).Once()
d.On("GetEventsByBlockRange", mock.Anything, uint64(9), uint64(19)).
Return(EVMBlocks{}, false)

// iteration 7: last finalized block is now 20, no events, report empty block
// iteration 6: last finalized block is now 20, no events, report empty block
d.On("GetLastFinalizedBlock", mock.Anything).Return(&types.Header{Number: big.NewInt(20)}, nil).Once()
d.On("GetEventsByBlockRange", mock.Anything, uint64(9), uint64(19)).
Return(EVMBlocks{}, false)

b20 := EVMBlock{
EVMBlockHeader: EVMBlockHeader{
Num: 20,
Hash: common.HexToHash("20"),
},
}
expectedBlocks = append(expectedBlocks, b20)
d.On("GetBlockHeader", mock.Anything, uint64(20)).Return(b20.EVMBlockHeader, false) // reporting empty finalized block
d.On("WaitForNewBlocks", mock.Anything, uint64(19)).
After(time.Millisecond * 100).
Return(uint64(20)).Once()
b19 := createEVMBlockFn(&types.Header{Number: big.NewInt(19)})
expectedBlocks = append(expectedBlocks, b19)
d.On("GetBlockHeader", mock.Anything, uint64(19)).Return(b19.EVMBlockHeader, false) // reporting empty finalized to block

// iteration 8: last finalized block is 21, no events
b20 := createEVMBlockFn(&types.Header{Number: big.NewInt(20)})
expectedBlocks = append(expectedBlocks, b20)
d.On("GetLastFinalizedBlock", mock.Anything).Return(&types.Header{Number: big.NewInt(21)}, nil).Once()
d.On("GetEventsByBlockRange", mock.Anything, uint64(20), uint64(20)).
Return(EVMBlocks{}, false)
d.On("GetBlockHeader", mock.Anything, uint64(20)).Return(b20.EVMBlockHeader, false) // reporting empty finalized to block

// iteration 9: last finalized block is 22, no events
d.On("WaitForNewBlocks", mock.Anything, uint64(20)).
After(time.Millisecond * 100).
Return(uint64(21)).Once()
b21 := createEVMBlockFn(&types.Header{Number: big.NewInt(21)})
expectedBlocks = append(expectedBlocks, b21)
d.On("GetLastFinalizedBlock", mock.Anything).Return(&types.Header{Number: big.NewInt(22)}, nil).Once()
d.On("GetEventsByBlockRange", mock.Anything, uint64(20), uint64(21)).
d.On("GetEventsByBlockRange", mock.Anything, uint64(21), uint64(21)).
Return(EVMBlocks{}, false)
d.On("GetBlockHeader", mock.Anything, uint64(21)).Return(b21.EVMBlockHeader, false) // reporting empty finalized to block

// iteration 10: last finalized block is 23, no events
d.On("WaitForNewBlocks", mock.Anything, uint64(21)).
After(time.Millisecond * 100).
Return(uint64(22)).Once()
b22 := createEVMBlockFn(&types.Header{Number: big.NewInt(22)})
expectedBlocks = append(expectedBlocks, b22)
d.On("GetLastFinalizedBlock", mock.Anything).Return(&types.Header{Number: big.NewInt(23)}, nil).Once()
d.On("GetEventsByBlockRange", mock.Anything, uint64(20), uint64(22)).
d.On("GetEventsByBlockRange", mock.Anything, uint64(22), uint64(22)).
Return(EVMBlocks{}, false)
d.On("GetBlockHeader", mock.Anything, uint64(22)).Return(b22.EVMBlockHeader, false) // reporting empty finalized to block

// iteration 11: last finalized block is still 23, no events
d.On("WaitForNewBlocks", mock.Anything, uint64(22)).
After(time.Millisecond * 100).
Return(uint64(23)).Once()
b23 := createEVMBlockFn(&types.Header{Number: big.NewInt(23)})
expectedBlocks = append(expectedBlocks, b23)
d.On("GetLastFinalizedBlock", mock.Anything).Return(&types.Header{Number: big.NewInt(23)}, nil).Once()
d.On("GetEventsByBlockRange", mock.Anything, uint64(20), uint64(23)).
d.On("GetEventsByBlockRange", mock.Anything, uint64(23), uint64(23)).
Return(EVMBlocks{}, false)
d.On("GetBlockHeader", mock.Anything, uint64(23)).Return(b23.EVMBlockHeader, false) // reporting empty finalized to block

// iteration 12: finalized block is 24, has events
d.On("WaitForNewBlocks", mock.Anything, uint64(23)).
After(time.Millisecond * 100).
Return(uint64(24)).Once()
b24 := EVMBlock{
EVMBlockHeader: EVMBlockHeader{
Num: 24,
Expand All @@ -317,8 +354,8 @@ func TestDownload(t *testing.T) {
Events: []interface{}{testEvent(common.HexToHash("24"))},
}
expectedBlocks = append(expectedBlocks, b24)
d.On("GetLastFinalizedBlock", mock.Anything).Return(&types.Header{Number: big.NewInt(24)}, nil).Twice()
d.On("GetEventsByBlockRange", mock.Anything, uint64(20), uint64(24)).
d.On("GetLastFinalizedBlock", mock.Anything).Return(&types.Header{Number: big.NewInt(24)}, nil).Once()
d.On("GetEventsByBlockRange", mock.Anything, uint64(24), uint64(24)).
Return(EVMBlocks{b24}, false)

// iteration 13: closing the downloader
Expand Down

0 comments on commit 3137fa6

Please sign in to comment.