Skip to content

Commit

Permalink
Minor update
Browse files Browse the repository at this point in the history
  • Loading branch information
begmaroman committed Aug 22, 2024
1 parent c723407 commit fc38452
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 72 deletions.
81 changes: 9 additions & 72 deletions reorgdetector/reorgdetector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,14 @@ import (
"sync"
"time"

"golang.org/x/sync/errgroup"

"github.com/0xPolygon/cdk/log"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/mdbx"
"golang.org/x/sync/errgroup"
)

type EthClient interface {
Expand All @@ -28,8 +27,6 @@ type ReorgDetector struct {
client EthClient
db kv.RwDB

canonicalBlocks *headersList

trackedBlocksLock sync.RWMutex
trackedBlocks map[string]*headersList

Expand All @@ -50,12 +47,11 @@ func New(client EthClient, dbPath string) (*ReorgDetector, error) {
}

return &ReorgDetector{
client: client,
db: db,
canonicalBlocks: newHeadersList(),
trackedBlocks: make(map[string]*headersList),
subscriptions: make(map[string]*Subscription),
notifiedReorgs: make(map[string]map[uint64]struct{}),
client: client,
db: db,
trackedBlocks: make(map[string]*headersList),
subscriptions: make(map[string]*Subscription),
notifiedReorgs: make(map[string]map[uint64]struct{}),
}, nil
}

Expand All @@ -65,21 +61,6 @@ func (rd *ReorgDetector) Start(ctx context.Context) (err error) {
return fmt.Errorf("failed to load tracked headers: %w", err)
}

// Initially load a full canonical chain
if err = rd.loadCanonicalChain(ctx); err != nil {
log.Errorf("failed to load canonical chain: %v", err)
}

// Continuously load canonical chain
go func() {
ticker := time.NewTicker(time.Second * 2) // TODO: Configure it
for range ticker.C {
if err = rd.loadCanonicalChain(ctx); err != nil {
log.Errorf("failed to load canonical chain: %v", err)
}
}
}()

// Continuously check reorgs in tracked by subscribers blocks
go func() {
ticker := time.NewTicker(time.Second) // TODO: Configure it
Expand Down Expand Up @@ -133,6 +114,7 @@ func (rd *ReorgDetector) detectReorgInTrackedList(ctx context.Context) error {
headers := hdrs.getSorted()
for _, hdr := range headers {
// Get the actual header from the network
// TODO: Cache it while iterating.
currentHeader, err := rd.client.HeaderByNumber(ctx, big.NewInt(int64(hdr.Num)))
if err != nil {
return fmt.Errorf("failed to get the header: %w", err)
Expand All @@ -141,10 +123,11 @@ func (rd *ReorgDetector) detectReorgInTrackedList(ctx context.Context) error {
// Check if the block hash matches with the actual block hash
if hdr.Hash == currentHeader.Hash() {
// Delete block from the tracked blocks list if it is less than or equal to the last finalized block
// and hashes matches
// and hashes matches. If higher than finalized block, we assume a reorg still might happen.
if hdr.Num <= lastFinalisedBlock.Number.Uint64() {
hdrs.removeRange(hdr.Num, hdr.Num)
}

continue
}

Expand All @@ -170,52 +153,6 @@ func (rd *ReorgDetector) detectReorgInTrackedList(ctx context.Context) error {
return errGroup.Wait()
}

// loadCanonicalChain loads canonical chain from the latest finalized block till the latest one
func (rd *ReorgDetector) loadCanonicalChain(ctx context.Context) error {
// Get the latest finalized block
lastFinalisedBlock, err := rd.client.HeaderByNumber(ctx, big.NewInt(int64(rpc.FinalizedBlockNumber)))
if err != nil {
return err
}

// Get the latest block
latestBlock, err := rd.client.HeaderByNumber(ctx, nil)
if err != nil {
return err
}

// Start from the last stored block if it less than the last finalized one
startFromBlock := lastFinalisedBlock.Number.Uint64()
if sortedBlocks := rd.canonicalBlocks.getSorted(); len(sortedBlocks) > 0 {
lastTrackedBlock := sortedBlocks[rd.canonicalBlocks.len()-1]
if lastTrackedBlock.Num < startFromBlock {
startFromBlock = lastTrackedBlock.Num

startHeader, err := rd.client.HeaderByNumber(ctx, big.NewInt(int64(startFromBlock)))
if err != nil {
return fmt.Errorf("failed to fetch the start header %d: %w", startFromBlock, err)
}

if startHeader.Hash() != lastTrackedBlock.Hash {
// Reorg happened, find the first reorg block to
}
}
}

// Load the canonical chain from the last finalized block till the latest block
for i := startFromBlock; i <= latestBlock.Number.Uint64(); i++ {
blockHeader, err := rd.client.HeaderByNumber(ctx, big.NewInt(int64(i)))
if err != nil {
return fmt.Errorf("failed to fetch block header for block number %d: %w", i, err)
}

// Add the block to the canonical chain
rd.canonicalBlocks.add(newHeader(blockHeader.Number.Uint64(), blockHeader.Hash()))
}

return nil
}

// loadTrackedHeaders loads tracked headers from the DB and stores them in memory
func (rd *ReorgDetector) loadTrackedHeaders(ctx context.Context) (err error) {
rd.trackedBlocksLock.Lock()
Expand Down
5 changes: 5 additions & 0 deletions reorgdetector/reorgdetector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func Test_ReorgDetector(t *testing.T) {
err = reorgDetector.AddBlockToTrack(ctx, subID, header.Number.Uint64(), header.Hash())
require.NoError(t, err)
trackedBlocks[headerNumber] = header.Hash()
fmt.Println("added block", time.Now(), headerNumber)
}
}

Expand All @@ -106,6 +107,8 @@ func Test_ReorgDetector(t *testing.T) {

err = clientL1.Fork(reorgBlock.Hash())
require.NoError(t, err)

fmt.Println("reorg happened", time.Now(), headerNumber)
}
}

Expand All @@ -131,6 +134,8 @@ func Test_ReorgDetector(t *testing.T) {
firstReorgedBlock := <-reorgSub.ReorgedBlock
reorgSub.ReorgProcessed <- true

fmt.Println("firstReorgedBlock", firstReorgedBlock)

processed, ok := expectReorgOn[firstReorgedBlock]
require.True(t, ok)
require.False(t, processed)
Expand Down

0 comments on commit fc38452

Please sign in to comment.