Skip to content

Commit

Permalink
Implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
begmaroman committed Aug 21, 2024
1 parent 9913d87 commit 6028440
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 53 deletions.
101 changes: 48 additions & 53 deletions reorgdetector/reorgdetector.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,12 @@ func New(client EthClient, dbPath string) (*ReorgDetector, error) {
}

func (rd *ReorgDetector) Start(ctx context.Context) error {
// Load tracked blocks from the DB
// Load canonical chain from the last finalized block
if err := rd.loadCanonicalChain(ctx); err != nil {
return fmt.Errorf("failed to load canonical chain: %w", err)
}

// Load and process tracked blocks from the DB
if err := rd.loadAndProcessTrackedBlocks(ctx); err != nil {
return fmt.Errorf("failed to load and process tracked blocks: %w", err)
}
Expand All @@ -100,23 +105,6 @@ func (rd *ReorgDetector) Start(ctx context.Context) error {
return nil
}

func (rd *ReorgDetector) Subscribe(id string) (*Subscription, error) {
rd.subscriptionsLock.Lock()
defer rd.subscriptionsLock.Unlock()

if sub, ok := rd.subscriptions[id]; ok {
return sub, nil
}

sub := &Subscription{
FirstReorgedBlock: make(chan uint64),
ReorgProcessed: make(chan bool),
}
rd.subscriptions[id] = sub

return sub, nil
}

func (rd *ReorgDetector) AddBlockToTrack(ctx context.Context, id string, blockNum uint64, blockHash, parentHash common.Hash) error {
rd.subscriptionsLock.RLock()
if sub, ok := rd.subscriptions[id]; !ok {
Expand Down Expand Up @@ -248,6 +236,8 @@ func (rd *ReorgDetector) onNewHeader(ctx context.Context, header *types.Header)

// rebuildCanonicalChain rebuilds the canonical chain from the given block number to the given block number.
func (rd *ReorgDetector) rebuildCanonicalChain(ctx context.Context, from, to uint64) error {
// TODO: Potentially rebuild from the latest finalized block

for i := from; i <= to; i++ {
blockHeader, err := rd.client.HeaderByNumber(ctx, big.NewInt(int64(i)))
if err != nil {
Expand All @@ -264,6 +254,40 @@ func (rd *ReorgDetector) rebuildCanonicalChain(ctx context.Context, from, to uin
return nil
}

// loadCanonicalChain loads canonical chain from the latest finalized block till the latest one
func (rd *ReorgDetector) loadCanonicalChain(ctx context.Context) error {
// Get the latest finalized block
lastFinalisedBlock, err := rd.client.HeaderByNumber(ctx, big.NewInt(int64(rpc.FinalizedBlockNumber)))
if err != nil {
return err
}

// Get the latest block
latestBlock, err := rd.client.HeaderByNumber(ctx, nil)
if err != nil {
return err
}

rd.canonicalBlocksLock.Lock()
defer rd.canonicalBlocksLock.Unlock()

// Load the canonical chain from the last finalized block till the latest block
for i := lastFinalisedBlock.Number.Uint64(); i <= latestBlock.Number.Uint64(); i++ {
blockHeader, err := rd.client.HeaderByNumber(ctx, big.NewInt(int64(i)))
if err != nil {
return fmt.Errorf("failed to fetch block header for block number %d: %w", i, err)
}

rd.canonicalBlocks[blockHeader.Number.Uint64()] = block{
Num: blockHeader.Number.Uint64(),
Hash: blockHeader.Hash(),
ParentHash: blockHeader.ParentHash,
}
}

return nil
}

// loadAndProcessTrackedBlocks loads tracked blocks from the DB and checks for reorgs. Loads in memory.
func (rd *ReorgDetector) loadAndProcessTrackedBlocks(ctx context.Context) error {
rd.trackedBlocksLock.Lock()
Expand All @@ -282,8 +306,6 @@ func (rd *ReorgDetector) loadAndProcessTrackedBlocks(ctx context.Context) error
return err
}

blocksGotten := make(map[uint64]common.Hash, 0)

for id, blocks := range rd.trackedBlocks {
rd.subscriptionsLock.Lock()
rd.subscriptions[id] = &Subscription{
Expand All @@ -299,22 +321,22 @@ func (rd *ReorgDetector) loadAndProcessTrackedBlocks(ctx context.Context) error

var (
lastTrackedBlock uint64
block block
actualBlockHash common.Hash
ok bool
)

sortedBlocks := blocks.getSorted()
lastTrackedBlock = sortedBlocks[len(blocks)-1].Num

for _, block = range sortedBlocks {
if actualBlockHash, ok = blocksGotten[block.Num]; !ok {
actualBlock, err := rd.client.HeaderByNumber(ctx, big.NewInt(int64(block.Num)))
for _, block := range sortedBlocks {
if actualBlock, ok := rd.canonicalBlocks[block.Num]; !ok {
header, err := rd.client.HeaderByNumber(ctx, big.NewInt(int64(block.Num)))
if err != nil {
return err
}

actualBlockHash = actualBlock.Hash()
actualBlockHash = header.Hash()
} else {
actualBlockHash = actualBlock.Hash
}

if actualBlockHash != block.Hash {
Expand All @@ -338,30 +360,3 @@ func (rd *ReorgDetector) loadAndProcessTrackedBlocks(ctx context.Context) error

return nil
}

func (rd *ReorgDetector) notifySubscribers(startingBlock block) {
rd.subscriptionsLock.RLock()
for _, sub := range rd.subscriptions {
sub.pendingReorgsToBeProcessed.Add(1)
go func(sub *Subscription) {
sub.FirstReorgedBlock <- startingBlock.Num
<-sub.ReorgProcessed
sub.pendingReorgsToBeProcessed.Done()
}(sub)
}
rd.subscriptionsLock.RUnlock()
}

func (rd *ReorgDetector) notifySubscriber(id string, startingBlock block) {
rd.subscriptionsLock.RLock()
subscriber, ok := rd.subscriptions[id]
if ok {
subscriber.pendingReorgsToBeProcessed.Add(1)
go func(sub *Subscription) {
sub.FirstReorgedBlock <- startingBlock.Num
<-sub.ReorgProcessed
sub.pendingReorgsToBeProcessed.Done()
}(subscriber)
}
rd.subscriptionsLock.RUnlock()
}
45 changes: 45 additions & 0 deletions reorgdetector/reorgdetector_sub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package reorgdetector

func (rd *ReorgDetector) Subscribe(id string) (*Subscription, error) {
rd.subscriptionsLock.Lock()
defer rd.subscriptionsLock.Unlock()

if sub, ok := rd.subscriptions[id]; ok {
return sub, nil
}

sub := &Subscription{
FirstReorgedBlock: make(chan uint64),
ReorgProcessed: make(chan bool),
}
rd.subscriptions[id] = sub

return sub, nil
}

func (rd *ReorgDetector) notifySubscribers(startingBlock block) {
rd.subscriptionsLock.RLock()
for _, sub := range rd.subscriptions {
sub.pendingReorgsToBeProcessed.Add(1)
go func(sub *Subscription) {
sub.FirstReorgedBlock <- startingBlock.Num
<-sub.ReorgProcessed
sub.pendingReorgsToBeProcessed.Done()
}(sub)
}
rd.subscriptionsLock.RUnlock()
}

func (rd *ReorgDetector) notifySubscriber(id string, startingBlock block) {
rd.subscriptionsLock.RLock()
subscriber, ok := rd.subscriptions[id]
if ok {
subscriber.pendingReorgsToBeProcessed.Add(1)
go func(sub *Subscription) {
sub.FirstReorgedBlock <- startingBlock.Num
<-sub.ReorgProcessed
sub.pendingReorgsToBeProcessed.Done()
}(subscriber)
}
rd.subscriptionsLock.RUnlock()
}

0 comments on commit 6028440

Please sign in to comment.