Skip to content

Commit

Permalink
feat: save block hash to block table and add more logs
Browse files Browse the repository at this point in the history
  • Loading branch information
goran-ethernal committed Jan 10, 2025
1 parent 934bb89 commit 374eafc
Show file tree
Hide file tree
Showing 13 changed files with 53 additions and 24 deletions.
2 changes: 1 addition & 1 deletion bridgesync/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestBridgeEventE2E(t *testing.T) {
dbPathReorg := path.Join(t.TempDir(), "file::memory:?cache=shared")

client, setup := helpers.SimulatedBackend(t, nil, 0)
rd, err := reorgdetector.New(client.Client(), reorgdetector.Config{DBPath: dbPathReorg})
rd, err := reorgdetector.New(client.Client(), reorgdetector.Config{DBPath: dbPathReorg}, reorgdetector.L1)
require.NoError(t, err)

go rd.Start(ctx) //nolint:errcheck
Expand Down
7 changes: 4 additions & 3 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,8 +500,9 @@ func newState(c *config.Config, l2ChainID uint64, sqlDB *pgxpool.Pool) *state.St
func newReorgDetector(
cfg *reorgdetector.Config,
client *ethclient.Client,
network reorgdetector.Network,
) *reorgdetector.ReorgDetector {
rd, err := reorgdetector.New(client, *cfg)
rd, err := reorgdetector.New(client, *cfg, network)
if err != nil {
log.Fatal(err)
}
Expand Down Expand Up @@ -600,7 +601,7 @@ func runReorgDetectorL1IfNeeded(
components) {
return nil, nil
}
rd := newReorgDetector(cfg, l1Client)
rd := newReorgDetector(cfg, l1Client, reorgdetector.L1)

errChan := make(chan error)
go func() {
Expand All @@ -622,7 +623,7 @@ func runReorgDetectorL2IfNeeded(
if !isNeeded([]string{cdkcommon.AGGORACLE, cdkcommon.RPC, cdkcommon.AGGSENDER}, components) {
return nil, nil
}
rd := newReorgDetector(cfg, l2Client)
rd := newReorgDetector(cfg, l2Client, reorgdetector.L2)

errChan := make(chan error)
go func() {
Expand Down
4 changes: 2 additions & 2 deletions l1infotreesync/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func TestWithReorgs(t *testing.T) {

client, auth, gerAddr, verifyAddr, gerSc, verifySC := newSimulatedClient(t)

rd, err := reorgdetector.New(client.Client(), reorgdetector.Config{DBPath: dbPathReorg, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 30)})
rd, err := reorgdetector.New(client.Client(), reorgdetector.Config{DBPath: dbPathReorg, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 30)}, reorgdetector.L1)
require.NoError(t, err)
require.NoError(t, rd.Start(ctx))

Expand Down Expand Up @@ -278,7 +278,7 @@ func TestStressAndReorgs(t *testing.T) {

client, auth, gerAddr, verifyAddr, gerSc, verifySC := newSimulatedClient(t)

rd, err := reorgdetector.New(client.Client(), reorgdetector.Config{DBPath: dbPathReorg, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 100)})
rd, err := reorgdetector.New(client.Client(), reorgdetector.Config{DBPath: dbPathReorg, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 100)}, reorgdetector.L1)
require.NoError(t, err)
require.NoError(t, rd.Start(ctx))

Expand Down
11 changes: 7 additions & 4 deletions l1infotreesync/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func (p *processor) ProcessBlock(ctx context.Context, block sync.Block) error {
}
}()

if _, err := tx.Exec(`INSERT INTO block (num) VALUES ($1)`, block.Num); err != nil {
if _, err := tx.Exec(`INSERT INTO block (num, hash) VALUES ($1, $2)`, block.Num, block.Hash.String()); err != nil {
return fmt.Errorf("insert Block. err: %w", err)
}

Expand Down Expand Up @@ -344,6 +344,9 @@ func (p *processor) ProcessBlock(ctx context.Context, block sync.Block) error {
l1InfoLeavesAdded++
}
if event.UpdateL1InfoTreeV2 != nil {
log.Debugf("handle UpdateL1InfoTreeV2 event. Block: %d, block hash: %s. Event root: %s. Event leaf count: %d.",
block.Num, block.Hash, event.UpdateL1InfoTreeV2.CurrentL1InfoRoot.String(), event.UpdateL1InfoTreeV2.LeafCount)

root, err := p.l1InfoTree.GetLastRoot(tx)
if err != nil {
return fmt.Errorf("GetLastRoot(). err: %w", err)
Expand All @@ -355,10 +358,10 @@ func (p *processor) ProcessBlock(ctx context.Context, block sync.Block) error {
if root.Hash != event.UpdateL1InfoTreeV2.CurrentL1InfoRoot || root.Index+1 != event.UpdateL1InfoTreeV2.LeafCount {
errStr := fmt.Sprintf(
"failed to check UpdateL1InfoTreeV2. Root: %s vs event:%s. "+
"Index: : %d vs event.LeafCount:%d. Happened on block %d",
root.Hash, common.Bytes2Hex(event.UpdateL1InfoTreeV2.CurrentL1InfoRoot[:]),
"Index: %d vs event.LeafCount: %d. Happened on block %d. Block hash: %s.",
root.Hash, event.UpdateL1InfoTreeV2.CurrentL1InfoRoot.String(),
root.Index, event.UpdateL1InfoTreeV2.LeafCount,
block.Num,
block.Num, block.Hash.String(),
)
log.Error(errStr)
p.haltedReason = errStr
Expand Down
18 changes: 17 additions & 1 deletion reorgdetector/reorgdetector.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,17 @@ import (
"golang.org/x/sync/errgroup"
)

type Network string

const (
L1 Network = "l1"
L2 Network = "l2"
)

func (n Network) String() string {
return string(n)
}

type EthClient interface {
SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error)
HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error)
Expand All @@ -34,9 +45,11 @@ type ReorgDetector struct {

subscriptionsLock sync.RWMutex
subscriptions map[string]*Subscription

log *log.Logger
}

func New(client EthClient, cfg Config) (*ReorgDetector, error) {
func New(client EthClient, cfg Config, network Network) (*ReorgDetector, error) {
err := migrations.RunMigrations(cfg.DBPath)
if err != nil {
return nil, err
Expand All @@ -52,6 +65,7 @@ func New(client EthClient, cfg Config) (*ReorgDetector, error) {
checkReorgInterval: cfg.GetCheckReorgsInterval(),
trackedBlocks: make(map[string]*headersList),
subscriptions: make(map[string]*Subscription),
log: log.WithFields("reorg-detector", network.String()),
}, nil
}

Expand Down Expand Up @@ -122,6 +136,8 @@ func (rd *ReorgDetector) detectReorgInTrackedList(ctx context.Context) error {
errGroup errgroup.Group
)

rd.log.Infof("Checking reorgs in tracked blocks up to block %d", lastFinalisedBlock.Number.Uint64())

subscriberIDs := rd.getSubscriberIDs()

for _, id := range subscriberIDs {
Expand Down
3 changes: 3 additions & 0 deletions reorgdetector/reorgdetector_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ func (rd *ReorgDetector) saveTrackedBlock(id string, b header) error {
hdrs.add(b)
}
rd.trackedBlocksLock.Unlock()

rd.log.Debugf("Tracking block %d for subscriber %s", b.Num, id)

return meddler.Insert(rd.db, "tracked_block", &headerWithSubscriberID{
SubscriberID: id,
Num: b.Num,
Expand Down
2 changes: 2 additions & 0 deletions reorgdetector/reorgdetector_sub.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ func (rd *ReorgDetector) notifySubscriber(id string, startingBlock header) {
sub, ok := rd.subscriptions[id]
rd.subscriptionsLock.RUnlock()

rd.log.Infof("Reorg detected for subscriber %s at block %d", id, startingBlock.Num)

if ok {
sub.ReorgedBlock <- startingBlock.Num
<-sub.ReorgProcessed
Expand Down
6 changes: 3 additions & 3 deletions reorgdetector/reorgdetector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func Test_ReorgDetector(t *testing.T) {
// Create test DB dir
testDir := path.Join(t.TempDir(), "file::memory:?cache=shared")

reorgDetector, err := New(clientL1.Client(), Config{DBPath: testDir, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 100)})
reorgDetector, err := New(clientL1.Client(), Config{DBPath: testDir, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 100)}, L1)
require.NoError(t, err)

err = reorgDetector.Start(ctx)
Expand Down Expand Up @@ -76,7 +76,7 @@ func Test_ReorgDetector(t *testing.T) {
func TestGetTrackedBlocks(t *testing.T) {
clientL1, _ := helpers.SimulatedBackend(t, nil, 0)
testDir := path.Join(t.TempDir(), "file::memory:?cache=shared")
reorgDetector, err := New(clientL1.Client(), Config{DBPath: testDir, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 100)})
reorgDetector, err := New(clientL1.Client(), Config{DBPath: testDir, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 100)}, L1)
require.NoError(t, err)
list, err := reorgDetector.getTrackedBlocks()
require.NoError(t, err)
Expand Down Expand Up @@ -130,7 +130,7 @@ func TestGetTrackedBlocks(t *testing.T) {
func TestNotSubscribed(t *testing.T) {
clientL1, _ := helpers.SimulatedBackend(t, nil, 0)
testDir := path.Join(t.TempDir(), "file::memory:?cache=shared")
reorgDetector, err := New(clientL1.Client(), Config{DBPath: testDir, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 100)})
reorgDetector, err := New(clientL1.Client(), Config{DBPath: testDir, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 100)}, L1)
require.NoError(t, err)
err = reorgDetector.AddBlockToTrack(context.Background(), "foo", 1, common.Hash{})
require.True(t, strings.Contains(err.Error(), "is not subscribed"))
Expand Down
3 changes: 3 additions & 0 deletions sync/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@ package sync
import (
"context"
"errors"

"github.com/ethereum/go-ethereum/common"
)

var ErrInconsistentState = errors.New("state is inconsistent, try again later once the state is consolidated")

type Block struct {
Num uint64
Events []interface{}
Hash common.Hash
}

type ProcessorInterface interface {
Expand Down
2 changes: 1 addition & 1 deletion sync/evmdownloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (d *EVMDownloader) Download(ctx context.Context, fromBlock uint64, download
for {
select {
case <-ctx.Done():
d.log.Debug("closing channel")
d.log.Info("closing evm downloader channel")
close(downloadedCh)
return
default:
Expand Down
3 changes: 2 additions & 1 deletion sync/evmdriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ reset:
d.log.Debugf("handleNewBlock, blockNum: %d, blockHash: %s", b.Num, b.Hash)
d.handleNewBlock(ctx, cancel, b)
case firstReorgedBlock := <-d.reorgSub.ReorgedBlock:
d.log.Debug("handleReorg from block: ", firstReorgedBlock)
d.log.Info("handleReorg from block: ", firstReorgedBlock)
d.handleReorg(ctx, cancel, firstReorgedBlock)
goto reset
}
Expand Down Expand Up @@ -143,6 +143,7 @@ func (d *EVMDriver) handleNewBlock(ctx context.Context, cancel context.CancelFun
blockToProcess := Block{
Num: b.Num,
Events: b.Events,
Hash: b.Hash,
}
err := d.processor.ProcessBlock(ctx, blockToProcess)
if err != nil {
Expand Down
14 changes: 7 additions & 7 deletions sync/evmdriver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,11 @@ func TestSync(t *testing.T) {
Return(uint64(3), nil)
rdm.On("AddBlockToTrack", ctx, reorgDetectorID, expectedBlock1.Num, expectedBlock1.Hash).
Return(nil)
pm.On("ProcessBlock", ctx, Block{Num: expectedBlock1.Num, Events: expectedBlock1.Events}).
pm.On("ProcessBlock", ctx, Block{Num: expectedBlock1.Num, Events: expectedBlock1.Events, Hash: expectedBlock1.Hash}).
Return(nil)
rdm.On("AddBlockToTrack", ctx, reorgDetectorID, expectedBlock2.Num, expectedBlock2.Hash).
Return(nil)
pm.On("ProcessBlock", ctx, Block{Num: expectedBlock2.Num, Events: expectedBlock2.Events}).
pm.On("ProcessBlock", ctx, Block{Num: expectedBlock2.Num, Events: expectedBlock2.Events, Hash: expectedBlock2.Hash}).
Return(nil)
go driver.Sync(ctx)
time.Sleep(time.Millisecond * 200) // time to download expectedBlock1
Expand Down Expand Up @@ -142,7 +142,7 @@ func TestHandleNewBlock(t *testing.T) {
rdm.
On("AddBlockToTrack", ctx, reorgDetectorID, b1.Num, b1.Hash).
Return(nil)
pm.On("ProcessBlock", ctx, Block{Num: b1.Num, Events: b1.Events}).
pm.On("ProcessBlock", ctx, Block{Num: b1.Num, Events: b1.Events, Hash: b1.Hash}).
Return(nil)
driver.handleNewBlock(ctx, nil, b1)

Expand All @@ -159,7 +159,7 @@ func TestHandleNewBlock(t *testing.T) {
rdm.
On("AddBlockToTrack", ctx, reorgDetectorID, b2.Num, b2.Hash).
Return(nil).Once()
pm.On("ProcessBlock", ctx, Block{Num: b2.Num, Events: b2.Events}).
pm.On("ProcessBlock", ctx, Block{Num: b2.Num, Events: b2.Events, Hash: b2.Hash}).
Return(nil)
driver.handleNewBlock(ctx, nil, b2)

Expand All @@ -173,9 +173,9 @@ func TestHandleNewBlock(t *testing.T) {
rdm.
On("AddBlockToTrack", ctx, reorgDetectorID, b3.Num, b3.Hash).
Return(nil)
pm.On("ProcessBlock", ctx, Block{Num: b3.Num, Events: b3.Events}).
pm.On("ProcessBlock", ctx, Block{Num: b3.Num, Events: b3.Events, Hash: b3.Hash}).
Return(errors.New("foo")).Once()
pm.On("ProcessBlock", ctx, Block{Num: b3.Num, Events: b3.Events}).
pm.On("ProcessBlock", ctx, Block{Num: b3.Num, Events: b3.Events, Hash: b3.Hash}).
Return(nil).Once()
driver.handleNewBlock(ctx, nil, b3)

Expand All @@ -189,7 +189,7 @@ func TestHandleNewBlock(t *testing.T) {
rdm.
On("AddBlockToTrack", ctx, reorgDetectorID, b4.Num, b4.Hash).
Return(nil)
pm.On("ProcessBlock", ctx, Block{Num: b4.Num, Events: b4.Events}).
pm.On("ProcessBlock", ctx, Block{Num: b4.Num, Events: b4.Events, Hash: b4.Hash}).
Return(ErrInconsistentState)
cancelIsCalled := false
cancel := func() {
Expand Down
2 changes: 1 addition & 1 deletion test/aggoraclehelpers/aggoracle_e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func CommonSetup(t *testing.T) (

// Reorg detector
dbPathReorgDetector := path.Join(t.TempDir(), "file::memory:?cache=shared")
reorg, err := reorgdetector.New(l1Client.Client(), reorgdetector.Config{DBPath: dbPathReorgDetector})
reorg, err := reorgdetector.New(l1Client.Client(), reorgdetector.Config{DBPath: dbPathReorgDetector}, reorgdetector.L1)
require.NoError(t, err)

// Syncer
Expand Down

0 comments on commit 374eafc

Please sign in to comment.