From fc384528230ceb5b536b0c69bb0c5c00e5e97c35 Mon Sep 17 00:00:00 2001 From: begmaroman <begmaroman@gmail.com> Date: Thu, 22 Aug 2024 18:43:12 +0100 Subject: [PATCH] Minor update --- reorgdetector/reorgdetector.go | 81 ++++------------------------- reorgdetector/reorgdetector_test.go | 5 ++ 2 files changed, 14 insertions(+), 72 deletions(-) diff --git a/reorgdetector/reorgdetector.go b/reorgdetector/reorgdetector.go index df2eba954..9438eb6e0 100644 --- a/reorgdetector/reorgdetector.go +++ b/reorgdetector/reorgdetector.go @@ -7,8 +7,6 @@ import ( "sync" "time" - "golang.org/x/sync/errgroup" - "github.com/0xPolygon/cdk/log" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" @@ -16,6 +14,7 @@ import ( "github.com/ethereum/go-ethereum/rpc" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv/mdbx" + "golang.org/x/sync/errgroup" ) type EthClient interface { @@ -28,8 +27,6 @@ type ReorgDetector struct { client EthClient db kv.RwDB - canonicalBlocks *headersList - trackedBlocksLock sync.RWMutex trackedBlocks map[string]*headersList @@ -50,12 +47,11 @@ func New(client EthClient, dbPath string) (*ReorgDetector, error) { } return &ReorgDetector{ - client: client, - db: db, - canonicalBlocks: newHeadersList(), - trackedBlocks: make(map[string]*headersList), - subscriptions: make(map[string]*Subscription), - notifiedReorgs: make(map[string]map[uint64]struct{}), + client: client, + db: db, + trackedBlocks: make(map[string]*headersList), + subscriptions: make(map[string]*Subscription), + notifiedReorgs: make(map[string]map[uint64]struct{}), }, nil } @@ -65,21 +61,6 @@ func (rd *ReorgDetector) Start(ctx context.Context) (err error) { return fmt.Errorf("failed to load tracked headers: %w", err) } - // Initially load a full canonical chain - if err = rd.loadCanonicalChain(ctx); err != nil { - log.Errorf("failed to load canonical chain: %v", err) - } - - // Continuously load canonical chain - go func() { - ticker := time.NewTicker(time.Second * 2) // TODO: Configure it - for range ticker.C { - if err = rd.loadCanonicalChain(ctx); err != nil { - log.Errorf("failed to load canonical chain: %v", err) - } - } - }() - // Continuously check reorgs in tracked by subscribers blocks go func() { ticker := time.NewTicker(time.Second) // TODO: Configure it @@ -133,6 +114,7 @@ func (rd *ReorgDetector) detectReorgInTrackedList(ctx context.Context) error { headers := hdrs.getSorted() for _, hdr := range headers { // Get the actual header from the network + // TODO: Cache it while iterating. currentHeader, err := rd.client.HeaderByNumber(ctx, big.NewInt(int64(hdr.Num))) if err != nil { return fmt.Errorf("failed to get the header: %w", err) @@ -141,10 +123,11 @@ func (rd *ReorgDetector) detectReorgInTrackedList(ctx context.Context) error { // Check if the block hash matches with the actual block hash if hdr.Hash == currentHeader.Hash() { // Delete block from the tracked blocks list if it is less than or equal to the last finalized block - // and hashes matches + // and hashes matches. If higher than finalized block, we assume a reorg still might happen. if hdr.Num <= lastFinalisedBlock.Number.Uint64() { hdrs.removeRange(hdr.Num, hdr.Num) } + continue } @@ -170,52 +153,6 @@ func (rd *ReorgDetector) detectReorgInTrackedList(ctx context.Context) error { return errGroup.Wait() } -// loadCanonicalChain loads canonical chain from the latest finalized block till the latest one -func (rd *ReorgDetector) loadCanonicalChain(ctx context.Context) error { - // Get the latest finalized block - lastFinalisedBlock, err := rd.client.HeaderByNumber(ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))) - if err != nil { - return err - } - - // Get the latest block - latestBlock, err := rd.client.HeaderByNumber(ctx, nil) - if err != nil { - return err - } - - // Start from the last stored block if it less than the last finalized one - startFromBlock := lastFinalisedBlock.Number.Uint64() - if sortedBlocks := rd.canonicalBlocks.getSorted(); len(sortedBlocks) > 0 { - lastTrackedBlock := sortedBlocks[rd.canonicalBlocks.len()-1] - if lastTrackedBlock.Num < startFromBlock { - startFromBlock = lastTrackedBlock.Num - - startHeader, err := rd.client.HeaderByNumber(ctx, big.NewInt(int64(startFromBlock))) - if err != nil { - return fmt.Errorf("failed to fetch the start header %d: %w", startFromBlock, err) - } - - if startHeader.Hash() != lastTrackedBlock.Hash { - // Reorg happened, find the first reorg block to - } - } - } - - // Load the canonical chain from the last finalized block till the latest block - for i := startFromBlock; i <= latestBlock.Number.Uint64(); i++ { - blockHeader, err := rd.client.HeaderByNumber(ctx, big.NewInt(int64(i))) - if err != nil { - return fmt.Errorf("failed to fetch block header for block number %d: %w", i, err) - } - - // Add the block to the canonical chain - rd.canonicalBlocks.add(newHeader(blockHeader.Number.Uint64(), blockHeader.Hash())) - } - - return nil -} - // loadTrackedHeaders loads tracked headers from the DB and stores them in memory func (rd *ReorgDetector) loadTrackedHeaders(ctx context.Context) (err error) { rd.trackedBlocksLock.Lock() diff --git a/reorgdetector/reorgdetector_test.go b/reorgdetector/reorgdetector_test.go index eca7ebd98..e64b89b4b 100644 --- a/reorgdetector/reorgdetector_test.go +++ b/reorgdetector/reorgdetector_test.go @@ -94,6 +94,7 @@ func Test_ReorgDetector(t *testing.T) { err = reorgDetector.AddBlockToTrack(ctx, subID, header.Number.Uint64(), header.Hash()) require.NoError(t, err) trackedBlocks[headerNumber] = header.Hash() + fmt.Println("added block", time.Now(), headerNumber) } } @@ -106,6 +107,8 @@ func Test_ReorgDetector(t *testing.T) { err = clientL1.Fork(reorgBlock.Hash()) require.NoError(t, err) + + fmt.Println("reorg happened", time.Now(), headerNumber) } } @@ -131,6 +134,8 @@ func Test_ReorgDetector(t *testing.T) { firstReorgedBlock := <-reorgSub.ReorgedBlock reorgSub.ReorgProcessed <- true + fmt.Println("firstReorgedBlock", firstReorgedBlock) + processed, ok := expectReorgOn[firstReorgedBlock] require.True(t, ok) require.False(t, processed)