Skip to content

Commit

Permalink
fix: syncer spamming RPC and reaching tip of chain (cherry-pick from …
Browse files Browse the repository at this point in the history
…CDK) (#177)
  • Loading branch information
joanestebanr authored Feb 3, 2025
1 parent e141db2 commit 17dca96
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 205 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,4 @@ Licensed under either of
at your option.

The SPDX license identifier for this project is `MIT OR Apache-2.0`.

79 changes: 51 additions & 28 deletions sync/evmdownloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ type LogAppenderMap map[common.Hash]func(b *EVMBlock, l types.Log) error
type EVMDownloader struct {
syncBlockChunkSize uint64
EVMDownloaderInterface
log *log.Logger
finalizedBlockType etherman.BlockNumberFinality
log *log.Logger
finalizedBlockType etherman.BlockNumberFinality
stopDownloaderOnIterationN int
}

func NewEVMDownloader(
Expand Down Expand Up @@ -101,9 +102,16 @@ func NewEVMDownloader(
}, nil
}

// setStopDownloaderOnIterationN sets the block number to stop the downloader (just for unittest)
func (d *EVMDownloader) setStopDownloaderOnIterationN(iteration int) {
d.stopDownloaderOnIterationN = iteration
}

func (d *EVMDownloader) Download(ctx context.Context, fromBlock uint64, downloadedCh chan EVMBlock) {
lastBlock := d.WaitForNewBlocks(ctx, 0)

toBlock := fromBlock + d.syncBlockChunkSize
iteration := 0
reachTop := false
for {
select {
case <-ctx.Done():
Expand All @@ -112,53 +120,68 @@ func (d *EVMDownloader) Download(ctx context.Context, fromBlock uint64, download
return
default:
}
d.log.Debugf("range: %d to %d, last block: %d", fromBlock, toBlock, lastBlock)

toBlock := fromBlock + d.syncBlockChunkSize
if toBlock > lastBlock {
toBlock = lastBlock
}

if fromBlock > toBlock {
if fromBlock > lastBlock || (reachTop && toBlock >= lastBlock) {
d.log.Debugf(
"waiting for new blocks, last block processed: %d, last block seen on L1: %d",
fromBlock-1, lastBlock,
"waiting for new blocks, current range: [%d to %d], last block seen: %d",
fromBlock, toBlock, lastBlock,
)
lastBlock = d.WaitForNewBlocks(ctx, fromBlock-1)
continue
}
lastBlock = d.WaitForNewBlocks(ctx, lastBlock)
d.log.Debugf("new last block seen: %d", lastBlock)

if fromBlock-toBlock < d.syncBlockChunkSize {
toBlock = fromBlock + d.syncBlockChunkSize
}
}
reachTop = false
lastFinalizedBlock, err := d.GetLastFinalizedBlock(ctx)
if err != nil {
d.log.Error("error getting last finalized block: ", err)
continue
}
// lastFinalizedBlock can't be > lastBlock
lastFinalizedBlockNumber := min(lastBlock, lastFinalizedBlock.Number.Uint64())

lastFinalizedBlockNumber := lastFinalizedBlock.Number.Uint64()

d.log.Debugf("getting events from blocks %d to %d. lastFinalizedBlock: %d",
fromBlock, toBlock, lastFinalizedBlockNumber)
blocks := d.GetEventsByBlockRange(ctx, fromBlock, toBlock)

requestToBlock := toBlock
if toBlock >= lastBlock {
requestToBlock = lastBlock
reachTop = true
}
d.log.Debugf("getting events from blocks [%d to %d] toBlock: %d. lastFinalizedBlock: %d lastBlock: %d",
fromBlock, requestToBlock, toBlock, lastFinalizedBlockNumber, lastBlock)
blocks := d.GetEventsByBlockRange(ctx, fromBlock, requestToBlock)
d.log.Debugf("result events from blocks [%d to %d] -> len(blocks)=%d",
fromBlock, requestToBlock, len(blocks))
if toBlock <= lastFinalizedBlockNumber {
d.reportBlocks(downloadedCh, blocks, lastFinalizedBlockNumber)
fromBlock = toBlock + 1

if blocks.Len() == 0 || blocks[blocks.Len()-1].Num < toBlock {
d.reportEmptyBlock(ctx, downloadedCh, toBlock, lastFinalizedBlockNumber)
}
fromBlock = toBlock + 1
toBlock = fromBlock + d.syncBlockChunkSize
} else {
d.reportBlocks(downloadedCh, blocks, lastFinalizedBlockNumber)

if blocks.Len() == 0 {
if lastFinalizedBlockNumber > fromBlock &&
lastFinalizedBlockNumber-fromBlock > d.syncBlockChunkSize {
d.reportEmptyBlock(ctx, downloadedCh, fromBlock+d.syncBlockChunkSize, lastFinalizedBlockNumber)
fromBlock += d.syncBlockChunkSize + 1
if lastFinalizedBlockNumber >= fromBlock {
emptyBlock := lastFinalizedBlockNumber
d.reportEmptyBlock(ctx, downloadedCh, emptyBlock, lastFinalizedBlockNumber)
fromBlock = emptyBlock + 1
toBlock = fromBlock + d.syncBlockChunkSize
} else {
// Extend range until find logs or reach the last finalized block
toBlock += d.syncBlockChunkSize
}
} else {
d.reportBlocks(downloadedCh, blocks, lastFinalizedBlockNumber)
fromBlock = blocks[blocks.Len()-1].Num + 1
toBlock = fromBlock + d.syncBlockChunkSize
}
}
iteration++
if d.stopDownloaderOnIterationN != 0 && iteration >= d.stopDownloaderOnIterationN {
d.log.Infof("stop downloader on iteration %d", iteration)
return
}
}
}

Expand Down
Loading

0 comments on commit 17dca96

Please sign in to comment.