Skip to content

Commit

Permalink
fix: downloader
Browse files Browse the repository at this point in the history
  • Loading branch information
goran-ethernal committed Jan 20, 2025
1 parent 4c9af9c commit 00b33d0
Show file tree
Hide file tree
Showing 8 changed files with 194 additions and 83 deletions.
93 changes: 82 additions & 11 deletions sync/evmdownloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rpc"
)

type EthClienter interface {
Expand All @@ -23,9 +24,10 @@ type EthClienter interface {

type EVMDownloaderInterface interface {
WaitForNewBlocks(ctx context.Context, lastBlockSeen uint64) (newLastBlock uint64)
GetEventsByBlockRange(ctx context.Context, fromBlock, toBlock uint64) []EVMBlock
GetEventsByBlockRange(ctx context.Context, fromBlock, toBlock uint64) EVMBlocks
GetLogs(ctx context.Context, fromBlock, toBlock uint64) []types.Log
GetBlockHeader(ctx context.Context, blockNum uint64) (EVMBlockHeader, bool)
GetLastFinalizedBlock(ctx context.Context) (*types.Header, error)
}

type LogAppenderMap map[common.Hash]func(b *EVMBlock, l types.Log) error
Expand Down Expand Up @@ -73,6 +75,9 @@ func NewEVMDownloader(

func (d *EVMDownloader) Download(ctx context.Context, fromBlock uint64, downloadedCh chan EVMBlock) {
lastBlock := d.WaitForNewBlocks(ctx, 0)
toBlock := fromBlock
automaticToBlockResize := true

for {
select {
case <-ctx.Done():
Expand All @@ -81,10 +86,16 @@ func (d *EVMDownloader) Download(ctx context.Context, fromBlock uint64, download
return
default:
}
toBlock := fromBlock + d.syncBlockChunkSize
if toBlock > lastBlock {
toBlock = lastBlock

if automaticToBlockResize {
toBlock = fromBlock + d.syncBlockChunkSize
if toBlock > lastBlock {
toBlock = lastBlock
}
}

automaticToBlockResize = true // reset the flag

if fromBlock > toBlock {
d.log.Debugf(
"waiting for new blocks, last block processed %d, last block seen on L1 %d",
Expand All @@ -93,16 +104,29 @@ func (d *EVMDownloader) Download(ctx context.Context, fromBlock uint64, download
lastBlock = d.WaitForNewBlocks(ctx, fromBlock-1)
continue
}

lastFinalizedBlock, err := d.GetLastFinalizedBlock(ctx)
if err != nil {
d.log.Error("error getting last finalized block: ", err)
continue
}

lastFinalizedBlockNumber := lastFinalizedBlock.Number.Uint64()

d.log.Debugf("getting events from blocks %d to %d", fromBlock, toBlock)
blocks := d.GetEventsByBlockRange(ctx, fromBlock, toBlock)
for _, b := range blocks {
d.log.Debugf("sending block %d to the driver (with events)", b.Num)
downloadedCh <- b

reportBlocksFn := func(numOfBlocksToReport int) {
for i := 0; i < numOfBlocksToReport; i++ {
d.log.Debugf("sending block %d to the driver (with events)", blocks[i].Num)
downloadedCh <- blocks[i]
}
}
if len(blocks) == 0 || blocks[len(blocks)-1].Num < toBlock {

reportEmptyBlockFn := func(blockNum uint64) {
// Indicate the last downloaded block if there are not events on it
d.log.Debugf("sending block %d to the driver (without events)", toBlock)
header, isCanceled := d.GetBlockHeader(ctx, toBlock)
header, isCanceled := d.GetBlockHeader(ctx, blockNum)
if isCanceled {
return
}
Expand All @@ -111,7 +135,50 @@ func (d *EVMDownloader) Download(ctx context.Context, fromBlock uint64, download
EVMBlockHeader: header,
}
}
fromBlock = toBlock + 1

if blocks.Len() == 0 {
// we have no events, keep increasing the block range until we hit a log
if lastFinalizedBlockNumber > toBlock {
// we might be behind a lot, so go until last finalized block
toBlock = lastFinalizedBlockNumber + 1
} else {
toBlock++
}

if lastFinalizedBlockNumber-fromBlock >= d.syncBlockChunkSize {
// if we already got a lot of finalized blocks that are empty, report an empty block
// to the driver to indicate that we are still processing the chain
// this is mainly needed for tests
reportEmptyBlockFn(lastFinalizedBlockNumber)
fromBlock = lastFinalizedBlockNumber
}

automaticToBlockResize = false

continue
} else if blocks[blocks.Len()-1].Num <= lastFinalizedBlockNumber {
// if the last block we have logs for is less than or equal to the last finalized block,
// report all of the blocks without the need to report the last empty block, since it is finalized
// and we do not need to track it in the reorg detector
reportBlocksFn(blocks.Len())
fromBlock = lastFinalizedBlockNumber + 1
} else if blocks[len(blocks)-1].Num < toBlock {
// if we have logs in some of the blocks, and they are not all finalized,
// check if we have finalized blocks in gotten range, report them and
// set the from block from the last finalized block and keep increasing the range
lastFinalizedBlock, index, exists := blocks.LastFinalizedBlock(lastFinalizedBlockNumber)
if exists {
reportBlocksFn(index + 1) // num of blocks to report is index + 1 since index is zero based
fromBlock = lastFinalizedBlock + 1
continue
}
} else {
// if we have logs in the last block, just report all of them and continue
// reorg detector will handle the reorg since the last block has events,
// and we are not afraid to have missaligned hashes at this point
reportBlocksFn(blocks.Len())
fromBlock = toBlock + 1
}
}
}

Expand Down Expand Up @@ -149,6 +216,10 @@ func NewEVMDownloaderImplementation(
}
}

func (d *EVMDownloaderImplementation) GetLastFinalizedBlock(ctx context.Context) (*types.Header, error) {
return d.ethClient.HeaderByNumber(ctx, big.NewInt(int64(rpc.SafeBlockNumber)))
}

func (d *EVMDownloaderImplementation) WaitForNewBlocks(
ctx context.Context, lastBlockSeen uint64,
) (newLastBlock uint64) {
Expand All @@ -175,7 +246,7 @@ func (d *EVMDownloaderImplementation) WaitForNewBlocks(
}
}

func (d *EVMDownloaderImplementation) GetEventsByBlockRange(ctx context.Context, fromBlock, toBlock uint64) []EVMBlock {
func (d *EVMDownloaderImplementation) GetEventsByBlockRange(ctx context.Context, fromBlock, toBlock uint64) EVMBlocks {
select {
case <-ctx.Done():
return nil
Expand Down
119 changes: 59 additions & 60 deletions sync/evmdownloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestGetEventsByBlockRange(t *testing.T) {
description string
inputLogs []types.Log
fromBlock, toBlock uint64
expectedBlocks []EVMBlock
expectedBlocks EVMBlocks
}
testCases := []testCase{}
ctx := context.Background()
Expand All @@ -47,7 +47,7 @@ func TestGetEventsByBlockRange(t *testing.T) {
inputLogs: []types.Log{},
fromBlock: 1,
toBlock: 3,
expectedBlocks: []EVMBlock{},
expectedBlocks: EVMBlocks{},
}
testCases = append(testCases, case0)

Expand All @@ -56,7 +56,7 @@ func TestGetEventsByBlockRange(t *testing.T) {
logsC1 := []types.Log{
*logC1,
}
blocksC1 := []EVMBlock{
blocksC1 := EVMBlocks{
{
EVMBlockHeader: EVMBlockHeader{
Num: logC1.BlockNumber,
Expand Down Expand Up @@ -121,7 +121,7 @@ func TestGetEventsByBlockRange(t *testing.T) {
*logC3_3,
*logC3_4,
}
blocksC3 := []EVMBlock{
blocksC3 := EVMBlocks{
{
EVMBlockHeader: EVMBlockHeader{
Num: logC3_1.BlockNumber,
Expand Down Expand Up @@ -206,48 +206,37 @@ func TestDownload(t *testing.T) {
downloadCh := make(chan EVMBlock, 1)
ctx := context.Background()
ctx1, cancel := context.WithCancel(ctx)
expectedBlocks := []EVMBlock{}
expectedBlocks := EVMBlocks{}
dwnldr, _ := NewTestDownloader(t)
dwnldr.EVMDownloaderInterface = d

d.On("WaitForNewBlocks", mock.Anything, uint64(0)).
Return(uint64(1))
// iteratiion 0:

// iteration 0:
// last block is 1, download that block (no events and wait)
b1 := EVMBlock{
EVMBlockHeader: EVMBlockHeader{
Num: 1,
Hash: common.HexToHash("01"),
},
}
expectedBlocks = append(expectedBlocks, b1)
d.On("GetLastFinalizedBlock", mock.Anything).Return(&types.Header{Number: big.NewInt(1)}, nil).Once()
d.On("GetEventsByBlockRange", mock.Anything, uint64(0), uint64(1)).
Return([]EVMBlock{}, false)
d.On("GetBlockHeader", mock.Anything, uint64(1)).
Return(b1.EVMBlockHeader, false)
Return(EVMBlocks{}, false).Once()

// iteration 1: wait for next block to be created
d.On("WaitForNewBlocks", mock.Anything, uint64(1)).
After(time.Millisecond * 100).
Return(uint64(2)).Once()

// iteration 2: block 2 has events
// iteration 1: block 2 has events
b2 := EVMBlock{
EVMBlockHeader: EVMBlockHeader{
Num: 2,
Hash: common.HexToHash("02"),
},
}
expectedBlocks = append(expectedBlocks, b2)
d.On("GetEventsByBlockRange", mock.Anything, uint64(2), uint64(2)).
Return([]EVMBlock{b2}, false)
d.On("GetLastFinalizedBlock", mock.Anything).Return(&types.Header{Number: big.NewInt(2)}, nil).Once()
d.On("GetEventsByBlockRange", mock.Anything, uint64(0), uint64(2)).
Return(EVMBlocks{b2}, false).Once()

// iteration 3: wait for next block to be created (jump to block 8)
// iteration 2: wait for next block to be created (jump to block 8)
d.On("WaitForNewBlocks", mock.Anything, uint64(2)).
After(time.Millisecond * 100).
Return(uint64(8)).Once()

// iteration 4: blocks 6 and 7 have events
// iteration 3: blocks 6 and 7 have events, but last finalized block is 5
b6 := EVMBlock{
EVMBlockHeader: EVMBlockHeader{
Num: 6,
Expand All @@ -262,52 +251,62 @@ func TestDownload(t *testing.T) {
},
Events: []interface{}{"07"},
}
b8 := EVMBlock{
EVMBlockHeader: EVMBlockHeader{
Num: 8,
Hash: common.HexToHash("08"),
},
}
expectedBlocks = append(expectedBlocks, b6, b7, b8)
expectedBlocks = append(expectedBlocks, b6, b7)
d.On("GetLastFinalizedBlock", mock.Anything).Return(&types.Header{Number: big.NewInt(5)}, nil).Once()
d.On("GetEventsByBlockRange", mock.Anything, uint64(3), uint64(8)).
Return([]EVMBlock{b6, b7}, false)
d.On("GetBlockHeader", mock.Anything, uint64(8)).
Return(b8.EVMBlockHeader, false)
Return(EVMBlocks{b6, b7}, false)

// iteration 5: wait for next block to be created (jump to block 30)
// iteration 5: finalized block is now block 8, we report events b6 and b7
d.On("GetLastFinalizedBlock", mock.Anything).Return(&types.Header{Number: big.NewInt(8)}, nil).Once()
d.On("GetEventsByBlockRange", mock.Anything, uint64(3), uint64(8)).
Return(EVMBlocks{b6, b7}, false)

// iteration 6: from block 9 to 19, no events
d.On("WaitForNewBlocks", mock.Anything, uint64(8)).
After(time.Millisecond * 100).
Return(uint64(30)).Once()
Return(uint64(19)).Once()
d.On("GetLastFinalizedBlock", mock.Anything).Return(&types.Header{Number: big.NewInt(15)}, nil).Once()
d.On("GetEventsByBlockRange", mock.Anything, uint64(9), uint64(19)).
Return(EVMBlocks{}, false)

// iteration 6: from block 9 to 19, no events
b19 := EVMBlock{
// iteration 7: last finalized block is now 20, no events, report empty block
d.On("GetLastFinalizedBlock", mock.Anything).Return(&types.Header{Number: big.NewInt(20)}, nil).Once()
d.On("GetEventsByBlockRange", mock.Anything, uint64(9), uint64(20)).
Return(EVMBlocks{}, false)

b20 := EVMBlock{
EVMBlockHeader: EVMBlockHeader{
Num: 19,
Hash: common.HexToHash("19"),
Num: 20,
Hash: common.HexToHash("20"),
},
}
expectedBlocks = append(expectedBlocks, b19)
d.On("GetEventsByBlockRange", mock.Anything, uint64(9), uint64(19)).
Return([]EVMBlock{}, false)
d.On("GetBlockHeader", mock.Anything, uint64(19)).
Return(b19.EVMBlockHeader, false)

// iteration 7: from block 20 to 30, events on last block
b30 := EVMBlock{
expectedBlocks = append(expectedBlocks, b20)
d.On("GetBlockHeader", mock.Anything, uint64(20)).Return(b20.EVMBlockHeader, false) // reporting empty finalized block

// iteration 8, 9 and 10: last finalized block is still 20, no events
d.On("GetLastFinalizedBlock", mock.Anything).Return(&types.Header{Number: big.NewInt(20)}, nil).Times(3)
d.On("GetEventsByBlockRange", mock.Anything, uint64(20), uint64(21)).
Return(EVMBlocks{}, false)
d.On("GetEventsByBlockRange", mock.Anything, uint64(20), uint64(22)).
Return(EVMBlocks{}, false)
d.On("GetEventsByBlockRange", mock.Anything, uint64(20), uint64(23)).
Return(EVMBlocks{}, false)

// iteration 11: from block 20 to 24, events on last block
b24 := EVMBlock{
EVMBlockHeader: EVMBlockHeader{
Num: 30,
Hash: common.HexToHash("30"),
Num: 24,
Hash: common.HexToHash("24"),
},
Events: []interface{}{testEvent(common.HexToHash("30"))},
Events: []interface{}{testEvent(common.HexToHash("24"))},
}
expectedBlocks = append(expectedBlocks, b30)
d.On("GetEventsByBlockRange", mock.Anything, uint64(20), uint64(30)).
Return([]EVMBlock{b30}, false)
expectedBlocks = append(expectedBlocks, b24)
d.On("GetLastFinalizedBlock", mock.Anything).Return(&types.Header{Number: big.NewInt(20)}, nil).Once()
d.On("GetEventsByBlockRange", mock.Anything, uint64(20), uint64(24)).
Return(EVMBlocks{b24}, false)

// iteration 8: wait for next block to be created (jump to block 35)
d.On("WaitForNewBlocks", mock.Anything, uint64(30)).
After(time.Millisecond * 100).
Return(uint64(35)).Once()
// iteration 12: closing the downloader
d.On("WaitForNewBlocks", mock.Anything, uint64(24)).Return(uint64(25)).After(time.Millisecond * 100).Once()

go dwnldr.Download(ctx1, 0, downloadCh)
for _, expectedBlock := range expectedBlocks {
Expand Down
5 changes: 0 additions & 5 deletions sync/evmdriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,6 @@ import (
"github.com/ethereum/go-ethereum/common"
)

type evmDownloaderFull interface {
EVMDownloaderInterface
downloader
}

type downloader interface {
Download(ctx context.Context, fromBlock uint64, downloadedCh chan EVMBlock)
}
Expand Down
16 changes: 16 additions & 0 deletions sync/evmtypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,22 @@ package sync

import "github.com/ethereum/go-ethereum/common"

type EVMBlocks []EVMBlock

func (e EVMBlocks) Len() int {
return len(e)
}

func (e EVMBlocks) LastFinalizedBlock(lastFinalizedBlockOnNetwork uint64) (uint64, int, bool) {
for i := len(e) - 1; i >= 0; i-- {
if e[i].Num <= lastFinalizedBlockOnNetwork {
return e[i].Num, i, true
}
}

return 0, 0, false // no finalized block found
}

type EVMBlock struct {
EVMBlockHeader
Events []interface{}
Expand Down
Loading

0 comments on commit 00b33d0

Please sign in to comment.