Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: syncer spamming RPC and reaching tip of chain (cherry-pick from CDK) #177

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,4 @@ Licensed under either of
at your option.

The SPDX license identifier for this project is `MIT OR Apache-2.0`.

79 changes: 51 additions & 28 deletions sync/evmdownloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ type LogAppenderMap map[common.Hash]func(b *EVMBlock, l types.Log) error
type EVMDownloader struct {
syncBlockChunkSize uint64
EVMDownloaderInterface
log *log.Logger
finalizedBlockType etherman.BlockNumberFinality
log *log.Logger
finalizedBlockType etherman.BlockNumberFinality
stopDownloaderOnIterationN int
}

func NewEVMDownloader(
Expand Down Expand Up @@ -101,9 +102,16 @@ func NewEVMDownloader(
}, nil
}

// setStopDownloaderOnIterationN sets the block number to stop the downloader (just for unittest)
func (d *EVMDownloader) setStopDownloaderOnIterationN(iteration int) {
d.stopDownloaderOnIterationN = iteration
}

func (d *EVMDownloader) Download(ctx context.Context, fromBlock uint64, downloadedCh chan EVMBlock) {
lastBlock := d.WaitForNewBlocks(ctx, 0)

toBlock := fromBlock + d.syncBlockChunkSize
iteration := 0
reachTop := false
for {
select {
case <-ctx.Done():
Expand All @@ -112,53 +120,68 @@ func (d *EVMDownloader) Download(ctx context.Context, fromBlock uint64, download
return
default:
}
d.log.Debugf("range: %d to %d, last block: %d", fromBlock, toBlock, lastBlock)

toBlock := fromBlock + d.syncBlockChunkSize
if toBlock > lastBlock {
toBlock = lastBlock
}

if fromBlock > toBlock {
if fromBlock > lastBlock || (reachTop && toBlock >= lastBlock) {
d.log.Debugf(
"waiting for new blocks, last block processed: %d, last block seen on L1: %d",
fromBlock-1, lastBlock,
"waiting for new blocks, current range: [%d to %d], last block seen: %d",
fromBlock, toBlock, lastBlock,
)
lastBlock = d.WaitForNewBlocks(ctx, fromBlock-1)
continue
}
lastBlock = d.WaitForNewBlocks(ctx, lastBlock)
d.log.Debugf("new last block seen: %d", lastBlock)

if fromBlock-toBlock < d.syncBlockChunkSize {
toBlock = fromBlock + d.syncBlockChunkSize
}
}
reachTop = false
lastFinalizedBlock, err := d.GetLastFinalizedBlock(ctx)
if err != nil {
d.log.Error("error getting last finalized block: ", err)
continue
}
// lastFinalizedBlock can't be > lastBlock
lastFinalizedBlockNumber := min(lastBlock, lastFinalizedBlock.Number.Uint64())

lastFinalizedBlockNumber := lastFinalizedBlock.Number.Uint64()

d.log.Debugf("getting events from blocks %d to %d. lastFinalizedBlock: %d",
fromBlock, toBlock, lastFinalizedBlockNumber)
blocks := d.GetEventsByBlockRange(ctx, fromBlock, toBlock)

requestToBlock := toBlock
if toBlock >= lastBlock {
requestToBlock = lastBlock
reachTop = true
}
d.log.Debugf("getting events from blocks [%d to %d] toBlock: %d. lastFinalizedBlock: %d lastBlock: %d",
fromBlock, requestToBlock, toBlock, lastFinalizedBlockNumber, lastBlock)
blocks := d.GetEventsByBlockRange(ctx, fromBlock, requestToBlock)
d.log.Debugf("result events from blocks [%d to %d] -> len(blocks)=%d",
fromBlock, requestToBlock, len(blocks))
if toBlock <= lastFinalizedBlockNumber {
d.reportBlocks(downloadedCh, blocks, lastFinalizedBlockNumber)
fromBlock = toBlock + 1

if blocks.Len() == 0 || blocks[blocks.Len()-1].Num < toBlock {
d.reportEmptyBlock(ctx, downloadedCh, toBlock, lastFinalizedBlockNumber)
}
fromBlock = toBlock + 1
toBlock = fromBlock + d.syncBlockChunkSize
} else {
d.reportBlocks(downloadedCh, blocks, lastFinalizedBlockNumber)

if blocks.Len() == 0 {
if lastFinalizedBlockNumber > fromBlock &&
lastFinalizedBlockNumber-fromBlock > d.syncBlockChunkSize {
d.reportEmptyBlock(ctx, downloadedCh, fromBlock+d.syncBlockChunkSize, lastFinalizedBlockNumber)
fromBlock += d.syncBlockChunkSize + 1
if lastFinalizedBlockNumber >= fromBlock {
emptyBlock := lastFinalizedBlockNumber
d.reportEmptyBlock(ctx, downloadedCh, emptyBlock, lastFinalizedBlockNumber)
fromBlock = emptyBlock + 1
toBlock = fromBlock + d.syncBlockChunkSize
} else {
// Extend range until find logs or reach the last finalized block
toBlock += d.syncBlockChunkSize
}
} else {
d.reportBlocks(downloadedCh, blocks, lastFinalizedBlockNumber)
fromBlock = blocks[blocks.Len()-1].Num + 1
toBlock = fromBlock + d.syncBlockChunkSize
}
}
iteration++
if d.stopDownloaderOnIterationN != 0 && iteration >= d.stopDownloaderOnIterationN {
d.log.Infof("stop downloader on iteration %d", iteration)
return
}
}
}

Expand Down
Loading
Loading