diff --git a/reorgdetector/reorgdetector.go b/reorgdetector/reorgdetector.go index 3c5317ba..c07306b2 100644 --- a/reorgdetector/reorgdetector.go +++ b/reorgdetector/reorgdetector.go @@ -89,7 +89,12 @@ func New(client EthClient, dbPath string) (*ReorgDetector, error) { } func (rd *ReorgDetector) Start(ctx context.Context) error { - // Load tracked blocks from the DB + // Load canonical chain from the last finalized block + if err := rd.loadCanonicalChain(ctx); err != nil { + return fmt.Errorf("failed to load canonical chain: %w", err) + } + + // Load and process tracked blocks from the DB if err := rd.loadAndProcessTrackedBlocks(ctx); err != nil { return fmt.Errorf("failed to load and process tracked blocks: %w", err) } @@ -100,23 +105,6 @@ func (rd *ReorgDetector) Start(ctx context.Context) error { return nil } -func (rd *ReorgDetector) Subscribe(id string) (*Subscription, error) { - rd.subscriptionsLock.Lock() - defer rd.subscriptionsLock.Unlock() - - if sub, ok := rd.subscriptions[id]; ok { - return sub, nil - } - - sub := &Subscription{ - FirstReorgedBlock: make(chan uint64), - ReorgProcessed: make(chan bool), - } - rd.subscriptions[id] = sub - - return sub, nil -} - func (rd *ReorgDetector) AddBlockToTrack(ctx context.Context, id string, blockNum uint64, blockHash, parentHash common.Hash) error { rd.subscriptionsLock.RLock() if sub, ok := rd.subscriptions[id]; !ok { @@ -248,6 +236,8 @@ func (rd *ReorgDetector) onNewHeader(ctx context.Context, header *types.Header) // rebuildCanonicalChain rebuilds the canonical chain from the given block number to the given block number. func (rd *ReorgDetector) rebuildCanonicalChain(ctx context.Context, from, to uint64) error { + // TODO: Potentially rebuild from the latest finalized block + for i := from; i <= to; i++ { blockHeader, err := rd.client.HeaderByNumber(ctx, big.NewInt(int64(i))) if err != nil { @@ -264,6 +254,40 @@ func (rd *ReorgDetector) rebuildCanonicalChain(ctx context.Context, from, to uin return nil } +// loadCanonicalChain loads canonical chain from the latest finalized block till the latest one +func (rd *ReorgDetector) loadCanonicalChain(ctx context.Context) error { + // Get the latest finalized block + lastFinalisedBlock, err := rd.client.HeaderByNumber(ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))) + if err != nil { + return err + } + + // Get the latest block + latestBlock, err := rd.client.HeaderByNumber(ctx, nil) + if err != nil { + return err + } + + rd.canonicalBlocksLock.Lock() + defer rd.canonicalBlocksLock.Unlock() + + // Load the canonical chain from the last finalized block till the latest block + for i := lastFinalisedBlock.Number.Uint64(); i <= latestBlock.Number.Uint64(); i++ { + blockHeader, err := rd.client.HeaderByNumber(ctx, big.NewInt(int64(i))) + if err != nil { + return fmt.Errorf("failed to fetch block header for block number %d: %w", i, err) + } + + rd.canonicalBlocks[blockHeader.Number.Uint64()] = block{ + Num: blockHeader.Number.Uint64(), + Hash: blockHeader.Hash(), + ParentHash: blockHeader.ParentHash, + } + } + + return nil +} + // loadAndProcessTrackedBlocks loads tracked blocks from the DB and checks for reorgs. Loads in memory. func (rd *ReorgDetector) loadAndProcessTrackedBlocks(ctx context.Context) error { rd.trackedBlocksLock.Lock() @@ -282,8 +306,6 @@ func (rd *ReorgDetector) loadAndProcessTrackedBlocks(ctx context.Context) error return err } - blocksGotten := make(map[uint64]common.Hash, 0) - for id, blocks := range rd.trackedBlocks { rd.subscriptionsLock.Lock() rd.subscriptions[id] = &Subscription{ @@ -299,22 +321,22 @@ func (rd *ReorgDetector) loadAndProcessTrackedBlocks(ctx context.Context) error var ( lastTrackedBlock uint64 - block block actualBlockHash common.Hash - ok bool ) sortedBlocks := blocks.getSorted() lastTrackedBlock = sortedBlocks[len(blocks)-1].Num - for _, block = range sortedBlocks { - if actualBlockHash, ok = blocksGotten[block.Num]; !ok { - actualBlock, err := rd.client.HeaderByNumber(ctx, big.NewInt(int64(block.Num))) + for _, block := range sortedBlocks { + if actualBlock, ok := rd.canonicalBlocks[block.Num]; !ok { + header, err := rd.client.HeaderByNumber(ctx, big.NewInt(int64(block.Num))) if err != nil { return err } - actualBlockHash = actualBlock.Hash() + actualBlockHash = header.Hash() + } else { + actualBlockHash = actualBlock.Hash } if actualBlockHash != block.Hash { @@ -338,30 +360,3 @@ func (rd *ReorgDetector) loadAndProcessTrackedBlocks(ctx context.Context) error return nil } - -func (rd *ReorgDetector) notifySubscribers(startingBlock block) { - rd.subscriptionsLock.RLock() - for _, sub := range rd.subscriptions { - sub.pendingReorgsToBeProcessed.Add(1) - go func(sub *Subscription) { - sub.FirstReorgedBlock <- startingBlock.Num - <-sub.ReorgProcessed - sub.pendingReorgsToBeProcessed.Done() - }(sub) - } - rd.subscriptionsLock.RUnlock() -} - -func (rd *ReorgDetector) notifySubscriber(id string, startingBlock block) { - rd.subscriptionsLock.RLock() - subscriber, ok := rd.subscriptions[id] - if ok { - subscriber.pendingReorgsToBeProcessed.Add(1) - go func(sub *Subscription) { - sub.FirstReorgedBlock <- startingBlock.Num - <-sub.ReorgProcessed - sub.pendingReorgsToBeProcessed.Done() - }(subscriber) - } - rd.subscriptionsLock.RUnlock() -} diff --git a/reorgdetector/reorgdetector_sub.go b/reorgdetector/reorgdetector_sub.go new file mode 100644 index 00000000..71bdf384 --- /dev/null +++ b/reorgdetector/reorgdetector_sub.go @@ -0,0 +1,45 @@ +package reorgdetector + +func (rd *ReorgDetector) Subscribe(id string) (*Subscription, error) { + rd.subscriptionsLock.Lock() + defer rd.subscriptionsLock.Unlock() + + if sub, ok := rd.subscriptions[id]; ok { + return sub, nil + } + + sub := &Subscription{ + FirstReorgedBlock: make(chan uint64), + ReorgProcessed: make(chan bool), + } + rd.subscriptions[id] = sub + + return sub, nil +} + +func (rd *ReorgDetector) notifySubscribers(startingBlock block) { + rd.subscriptionsLock.RLock() + for _, sub := range rd.subscriptions { + sub.pendingReorgsToBeProcessed.Add(1) + go func(sub *Subscription) { + sub.FirstReorgedBlock <- startingBlock.Num + <-sub.ReorgProcessed + sub.pendingReorgsToBeProcessed.Done() + }(sub) + } + rd.subscriptionsLock.RUnlock() +} + +func (rd *ReorgDetector) notifySubscriber(id string, startingBlock block) { + rd.subscriptionsLock.RLock() + subscriber, ok := rd.subscriptions[id] + if ok { + subscriber.pendingReorgsToBeProcessed.Add(1) + go func(sub *Subscription) { + sub.FirstReorgedBlock <- startingBlock.Num + <-sub.ReorgProcessed + sub.pendingReorgsToBeProcessed.Done() + }(subscriber) + } + rd.subscriptionsLock.RUnlock() +}