Skip to content
This repository has been archived by the owner on Dec 4, 2024. It is now read-only.

Commit

Permalink
custom blocktracker
Browse files Browse the repository at this point in the history
  • Loading branch information
igorcrevar committed Sep 19, 2023
1 parent 4fbe4ba commit accc3d6
Show file tree
Hide file tree
Showing 6 changed files with 618 additions and 6 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -231,3 +231,5 @@ require (
gotest.tools/v3 v3.0.2 // indirect
inet.af/netaddr v0.0.0-20230525184311-b8eac61e914a // indirect
)

replace github.com/umbracle/ethgo => github.com/igorcrevar/ethgo v0.1.1000
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,8 @@ github.com/huin/goupnp v1.1.0 h1:gEe0Dp/lZmPZiDFzJJaOfUpOvv2MKUkoBX8lDrn9vKU=
github.com/huin/goupnp v1.1.0/go.mod h1:gnGPsThkYa7bFi/KWmEysQRf48l2dvR5bxr2OFckNX8=
github.com/huin/goutil v0.0.0-20170803182201-1ca381bf3150/go.mod h1:PpLOETDnJ0o3iZrZfqZzyLl6l7F3c6L1oWn7OICBi6o=
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/igorcrevar/ethgo v0.1.1000 h1:YOneC/vjuXYPoNNFi1gy48TRSuOpy3X1apLhHy+amYs=
github.com/igorcrevar/ethgo v0.1.1000/go.mod h1:J+OZNfRCtbaYW3AEc0m47GhwAzlNJjcr9vO86nzOr6E=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/ipfs/boxo v0.8.1 h1:3DkKBCK+3rdEB5t77WDShUXXhktYwH99mkAsgajsKrU=
Expand Down Expand Up @@ -705,8 +707,6 @@ github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo=
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
github.com/ugorji/go/codec v1.2.11 h1:BMaWp1Bb6fHwEtbplGBGJ498wD+LKlNSl25MjdZY4dU=
github.com/umbracle/ethgo v0.1.4-0.20230810113823-c9c19bcd8a1e h1:CPUqjupBwC5EpV/eY/K+E65ZvFZkcXYgWf6KzypBQDY=
github.com/umbracle/ethgo v0.1.4-0.20230810113823-c9c19bcd8a1e/go.mod h1:J+OZNfRCtbaYW3AEc0m47GhwAzlNJjcr9vO86nzOr6E=
github.com/umbracle/fastrlp v0.1.1-0.20230504065717-58a1b8a9929d h1:HAg1Kpr9buwRxEiC2UXU9oT2AU8uCU7o3/WTH+Lt5wo=
github.com/umbracle/fastrlp v0.1.1-0.20230504065717-58a1b8a9929d/go.mod h1:5RHgqiFjd4vLJESMWagP/E7su+5Gzk0iqqmrotR8WdA=
github.com/umbracle/go-eth-bn256 v0.0.0-20230125114011-47cb310d9b0b h1:5/xofhZiOG0I9DQXqDSPxqYObk6QI7mBGMJI+ngyIgc=
Expand Down
335 changes: 335 additions & 0 deletions tracker/blocktracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,335 @@
package tracker

import (
"context"
"fmt"
"sync"

"github.com/hashicorp/go-hclog"
"github.com/umbracle/ethgo"
bt "github.com/umbracle/ethgo/blocktracker"
)

const (
defaultMaxBlockBacklog = 10
)

// BlockTracker is an interface to track new blocks on the chain
type BlockTracker struct {
config *Config
blocks []*ethgo.Block
blocksLock sync.Mutex
subscriber bt.BlockTrackerInterface
blockChs []chan *bt.BlockEvent
blockChsLock sync.Mutex
provider bt.BlockProvider
closeCh chan struct{}
once sync.Once
logger hclog.Logger
}

type Config struct {
Tracker bt.BlockTrackerInterface
MaxBlockBacklog uint64
}

func DefaultConfig() *Config {
return &Config{
MaxBlockBacklog: defaultMaxBlockBacklog,
}
}

type ConfigOption func(*Config)

func WithBlockMaxBacklog(b uint64) ConfigOption {
return func(c *Config) {
c.MaxBlockBacklog = b
}
}

func WithTracker(b bt.BlockTrackerInterface) ConfigOption {
return func(c *Config) {
c.Tracker = b
}
}

func NewBlockTracker(provider bt.BlockProvider, logger hclog.Logger, opts ...ConfigOption) *BlockTracker {
config := DefaultConfig()
for _, opt := range opts {
opt(config)
}

tracker := config.Tracker
if tracker == nil {
tracker = bt.NewJSONBlockTracker(provider)
}

return &BlockTracker{
blocks: []*ethgo.Block{},
blockChs: []chan *bt.BlockEvent{},
config: config,
subscriber: tracker,
provider: provider,
closeCh: make(chan struct{}),
logger: logger,
blocksLock: sync.Mutex{},
blockChsLock: sync.Mutex{},
once: sync.Once{},
}
}

func (t *BlockTracker) Subscribe() chan *bt.BlockEvent {
t.blockChsLock.Lock()
defer t.blockChsLock.Unlock()

ch := make(chan *bt.BlockEvent, 1)
t.blockChs = append(t.blockChs, ch)

return ch
}

func (t *BlockTracker) AcquireLock() bt.Lock {
return bt.NewLock(&t.blocksLock)
}

func (t *BlockTracker) Init() error {
var (
block *ethgo.Block
err error
i uint64
)

t.once.Do(func() {
block, err = t.provider.GetBlockByNumber(ethgo.Latest, false)
if err != nil {
return
}

if block.Number == 0 {
return
}

blocks := make([]*ethgo.Block, t.config.MaxBlockBacklog)

for i = 0; i < t.config.MaxBlockBacklog; i++ {
blocks[t.config.MaxBlockBacklog-i-1] = block
if block.Number == 0 {
break
}

block, err = t.provider.GetBlockByHash(block.ParentHash, false)
if err != nil {
return
} else if block == nil {
// if block does not exist (for example reorg happened) GetBlockByHash will return nil, nil
err = fmt.Errorf("block with hash %s not found", block.ParentHash)

return
}
}

if i != t.config.MaxBlockBacklog {
// less than maxBacklog elements
blocks = blocks[t.config.MaxBlockBacklog-i-1:]
}

t.blocks = blocks
})

return err
}

func (t *BlockTracker) MaxBlockBacklog() uint64 {
return t.config.MaxBlockBacklog
}

func (t *BlockTracker) LastBlocked() *ethgo.Block {
target := t.blocks[len(t.blocks)-1]
if target == nil {
return nil
}

return target.Copy()
}

func (t *BlockTracker) BlocksBlocked() []*ethgo.Block {
res := make([]*ethgo.Block, len(t.blocks))
for i, block := range t.blocks {
res[i] = block.Copy()
}

return res
}

func (t *BlockTracker) Len() int {
return len(t.blocks)
}

func (t *BlockTracker) Close() error {
close(t.closeCh)

return nil
}

func (t *BlockTracker) Start() error {
ctx, cancelFn := context.WithCancel(context.Background())
go func() {
<-t.closeCh
cancelFn()
}()

// start the polling
return t.subscriber.Track(ctx, func(block *ethgo.Block) error {
return t.HandleTrackedBlock(block)
})
}

func (t *BlockTracker) AddBlockLocked(block *ethgo.Block) error {
if uint64(len(t.blocks)) == t.config.MaxBlockBacklog {
// remove past blocks if there are more than maxReconcileBlocks
t.blocks = t.blocks[1:]
}

if len(t.blocks) != 0 {
if lastNum := t.blocks[len(t.blocks)-1].Number; lastNum+1 != block.Number {
return fmt.Errorf("bad number sequence. %d and %d", lastNum, block.Number)
}
}

t.blocks = append(t.blocks, block)

return nil
}

func (t *BlockTracker) blockAtIndex(hash ethgo.Hash) int {
for indx, b := range t.blocks {
if b.Hash == hash {
return indx
}
}

return -1
}

func (t *BlockTracker) handleReconcileImpl(block *ethgo.Block) ([]*ethgo.Block, int, error) {
// The state is empty
if len(t.blocks) == 0 {
return []*ethgo.Block{block}, -1, nil
}

// Append to the head of the chain
if t.blocks[len(t.blocks)-1].Hash == block.ParentHash {
return []*ethgo.Block{block}, -1, nil
}

// The block already exists, but if not last, remove all the following
if indx := t.blockAtIndex(block.Hash); indx != -1 {
return nil, indx + 1, nil
}

// Fork in the middle of the chain
if indx := t.blockAtIndex(block.ParentHash); indx != -1 {
return []*ethgo.Block{block}, indx + 1, nil
}

// Backfill. We dont know the parent of the block.
// Need to query the chain until we find a known block
var (
added = []*ethgo.Block{block}
count uint64 = 0
indx int
err error
)

for ; count < t.config.MaxBlockBacklog; count++ {
hash := block.ParentHash

block, err = t.provider.GetBlockByHash(hash, false)
if err != nil {
return nil, -1, fmt.Errorf("parent with hash retrieving error: %w", err)
} else if block == nil {
// if block does not exist (for example reorg happened) GetBlockByHash will return nil, nil
return nil, -1, fmt.Errorf("parent with hash %s not found", hash)
}

added = append(added, block)

if indx = t.blockAtIndex(block.ParentHash); indx != -1 {
indx++

break
}
}

// need the blocks in reverse order
for i := len(added)/2 - 1; i >= 0; i-- {
opp := len(added) - 1 - i
added[i], added[opp] = added[opp], added[i]
}

if count == t.config.MaxBlockBacklog {
indx = 0 // remove all from the current state

t.logger.Info("reconcile did not found parent for new blocks", "hash", added[0].Hash)
}

return added, indx, nil
}

func (t *BlockTracker) HandleBlockEvent(block *ethgo.Block) (*bt.BlockEvent, error) {
t.blocksLock.Lock()
defer t.blocksLock.Unlock()

blocks, indx, err := t.handleReconcileImpl(block)
if err != nil {
return nil, err
}

if len(blocks) == 0 {
return nil, nil
}

blockEvnt := &bt.BlockEvent{}

// there are some blocks to remove
if indx >= 0 {
for i := indx; i < len(t.blocks); i++ {
blockEvnt.Removed = append(blockEvnt.Removed, t.blocks[i])
}

t.blocks = t.blocks[:indx]
}

// include the new blocks
for _, block := range blocks {
blockEvnt.Added = append(blockEvnt.Added, block)

if err := t.AddBlockLocked(block); err != nil {
return nil, err
}
}

return blockEvnt, nil
}

func (t *BlockTracker) HandleTrackedBlock(block *ethgo.Block) error {
blockEvnt, err := t.HandleBlockEvent(block)
if err != nil {
return err
}

if blockEvnt != nil {
for _, x := range blockEvnt.Added {
fmt.Println(x.Number, x.Hash)
}

t.blockChsLock.Lock()
defer t.blockChsLock.Unlock()

for _, ch := range t.blockChs {
ch <- blockEvnt
}
}

return nil
}
Loading

0 comments on commit accc3d6

Please sign in to comment.