From 374eafcd07311fbfd712000d5a2a742d578f27f7 Mon Sep 17 00:00:00 2001 From: Goran Rojovic Date: Fri, 10 Jan 2025 11:35:22 +0100 Subject: [PATCH] feat: save block hash to block table and add more logs --- bridgesync/e2e_test.go | 2 +- cmd/run.go | 7 ++++--- l1infotreesync/e2e_test.go | 4 ++-- l1infotreesync/processor.go | 11 +++++++---- reorgdetector/reorgdetector.go | 18 +++++++++++++++++- reorgdetector/reorgdetector_db.go | 3 +++ reorgdetector/reorgdetector_sub.go | 2 ++ reorgdetector/reorgdetector_test.go | 6 +++--- sync/driver.go | 3 +++ sync/evmdownloader.go | 2 +- sync/evmdriver.go | 3 ++- sync/evmdriver_test.go | 14 +++++++------- test/aggoraclehelpers/aggoracle_e2e.go | 2 +- 13 files changed, 53 insertions(+), 24 deletions(-) diff --git a/bridgesync/e2e_test.go b/bridgesync/e2e_test.go index 6f1e10c4..9f5a2bd4 100644 --- a/bridgesync/e2e_test.go +++ b/bridgesync/e2e_test.go @@ -23,7 +23,7 @@ func TestBridgeEventE2E(t *testing.T) { dbPathReorg := path.Join(t.TempDir(), "file::memory:?cache=shared") client, setup := helpers.SimulatedBackend(t, nil, 0) - rd, err := reorgdetector.New(client.Client(), reorgdetector.Config{DBPath: dbPathReorg}) + rd, err := reorgdetector.New(client.Client(), reorgdetector.Config{DBPath: dbPathReorg}, reorgdetector.L1) require.NoError(t, err) go rd.Start(ctx) //nolint:errcheck diff --git a/cmd/run.go b/cmd/run.go index 727533e8..da21154b 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -500,8 +500,9 @@ func newState(c *config.Config, l2ChainID uint64, sqlDB *pgxpool.Pool) *state.St func newReorgDetector( cfg *reorgdetector.Config, client *ethclient.Client, + network reorgdetector.Network, ) *reorgdetector.ReorgDetector { - rd, err := reorgdetector.New(client, *cfg) + rd, err := reorgdetector.New(client, *cfg, network) if err != nil { log.Fatal(err) } @@ -600,7 +601,7 @@ func runReorgDetectorL1IfNeeded( components) { return nil, nil } - rd := newReorgDetector(cfg, l1Client) + rd := newReorgDetector(cfg, l1Client, reorgdetector.L1) errChan := make(chan error) go func() { @@ -622,7 +623,7 @@ func runReorgDetectorL2IfNeeded( if !isNeeded([]string{cdkcommon.AGGORACLE, cdkcommon.RPC, cdkcommon.AGGSENDER}, components) { return nil, nil } - rd := newReorgDetector(cfg, l2Client) + rd := newReorgDetector(cfg, l2Client, reorgdetector.L2) errChan := make(chan error) go func() { diff --git a/l1infotreesync/e2e_test.go b/l1infotreesync/e2e_test.go index 132f563f..2e22e138 100644 --- a/l1infotreesync/e2e_test.go +++ b/l1infotreesync/e2e_test.go @@ -160,7 +160,7 @@ func TestWithReorgs(t *testing.T) { client, auth, gerAddr, verifyAddr, gerSc, verifySC := newSimulatedClient(t) - rd, err := reorgdetector.New(client.Client(), reorgdetector.Config{DBPath: dbPathReorg, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 30)}) + rd, err := reorgdetector.New(client.Client(), reorgdetector.Config{DBPath: dbPathReorg, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 30)}, reorgdetector.L1) require.NoError(t, err) require.NoError(t, rd.Start(ctx)) @@ -278,7 +278,7 @@ func TestStressAndReorgs(t *testing.T) { client, auth, gerAddr, verifyAddr, gerSc, verifySC := newSimulatedClient(t) - rd, err := reorgdetector.New(client.Client(), reorgdetector.Config{DBPath: dbPathReorg, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 100)}) + rd, err := reorgdetector.New(client.Client(), reorgdetector.Config{DBPath: dbPathReorg, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 100)}, reorgdetector.L1) require.NoError(t, err) require.NoError(t, rd.Start(ctx)) diff --git a/l1infotreesync/processor.go b/l1infotreesync/processor.go index ee94e829..42dd1bfc 100644 --- a/l1infotreesync/processor.go +++ b/l1infotreesync/processor.go @@ -294,7 +294,7 @@ func (p *processor) ProcessBlock(ctx context.Context, block sync.Block) error { } }() - if _, err := tx.Exec(`INSERT INTO block (num) VALUES ($1)`, block.Num); err != nil { + if _, err := tx.Exec(`INSERT INTO block (num, hash) VALUES ($1, $2)`, block.Num, block.Hash.String()); err != nil { return fmt.Errorf("insert Block. err: %w", err) } @@ -344,6 +344,9 @@ func (p *processor) ProcessBlock(ctx context.Context, block sync.Block) error { l1InfoLeavesAdded++ } if event.UpdateL1InfoTreeV2 != nil { + log.Debugf("handle UpdateL1InfoTreeV2 event. Block: %d, block hash: %s. Event root: %s. Event leaf count: %d.", + block.Num, block.Hash, event.UpdateL1InfoTreeV2.CurrentL1InfoRoot.String(), event.UpdateL1InfoTreeV2.LeafCount) + root, err := p.l1InfoTree.GetLastRoot(tx) if err != nil { return fmt.Errorf("GetLastRoot(). err: %w", err) @@ -355,10 +358,10 @@ func (p *processor) ProcessBlock(ctx context.Context, block sync.Block) error { if root.Hash != event.UpdateL1InfoTreeV2.CurrentL1InfoRoot || root.Index+1 != event.UpdateL1InfoTreeV2.LeafCount { errStr := fmt.Sprintf( "failed to check UpdateL1InfoTreeV2. Root: %s vs event:%s. "+ - "Index: : %d vs event.LeafCount:%d. Happened on block %d", - root.Hash, common.Bytes2Hex(event.UpdateL1InfoTreeV2.CurrentL1InfoRoot[:]), + "Index: %d vs event.LeafCount: %d. Happened on block %d. Block hash: %s.", + root.Hash, event.UpdateL1InfoTreeV2.CurrentL1InfoRoot.String(), root.Index, event.UpdateL1InfoTreeV2.LeafCount, - block.Num, + block.Num, block.Hash.String(), ) log.Error(errStr) p.haltedReason = errStr diff --git a/reorgdetector/reorgdetector.go b/reorgdetector/reorgdetector.go index 91d21354..226a4211 100644 --- a/reorgdetector/reorgdetector.go +++ b/reorgdetector/reorgdetector.go @@ -18,6 +18,17 @@ import ( "golang.org/x/sync/errgroup" ) +type Network string + +const ( + L1 Network = "l1" + L2 Network = "l2" +) + +func (n Network) String() string { + return string(n) +} + type EthClient interface { SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) @@ -34,9 +45,11 @@ type ReorgDetector struct { subscriptionsLock sync.RWMutex subscriptions map[string]*Subscription + + log *log.Logger } -func New(client EthClient, cfg Config) (*ReorgDetector, error) { +func New(client EthClient, cfg Config, network Network) (*ReorgDetector, error) { err := migrations.RunMigrations(cfg.DBPath) if err != nil { return nil, err @@ -52,6 +65,7 @@ func New(client EthClient, cfg Config) (*ReorgDetector, error) { checkReorgInterval: cfg.GetCheckReorgsInterval(), trackedBlocks: make(map[string]*headersList), subscriptions: make(map[string]*Subscription), + log: log.WithFields("reorg-detector", network.String()), }, nil } @@ -122,6 +136,8 @@ func (rd *ReorgDetector) detectReorgInTrackedList(ctx context.Context) error { errGroup errgroup.Group ) + rd.log.Infof("Checking reorgs in tracked blocks up to block %d", lastFinalisedBlock.Number.Uint64()) + subscriberIDs := rd.getSubscriberIDs() for _, id := range subscriberIDs { diff --git a/reorgdetector/reorgdetector_db.go b/reorgdetector/reorgdetector_db.go index 3a066b7f..f4ec4974 100644 --- a/reorgdetector/reorgdetector_db.go +++ b/reorgdetector/reorgdetector_db.go @@ -60,6 +60,9 @@ func (rd *ReorgDetector) saveTrackedBlock(id string, b header) error { hdrs.add(b) } rd.trackedBlocksLock.Unlock() + + rd.log.Debugf("Tracking block %d for subscriber %s", b.Num, id) + return meddler.Insert(rd.db, "tracked_block", &headerWithSubscriberID{ SubscriberID: id, Num: b.Num, diff --git a/reorgdetector/reorgdetector_sub.go b/reorgdetector/reorgdetector_sub.go index c5002a2b..ca01fd19 100644 --- a/reorgdetector/reorgdetector_sub.go +++ b/reorgdetector/reorgdetector_sub.go @@ -37,6 +37,8 @@ func (rd *ReorgDetector) notifySubscriber(id string, startingBlock header) { sub, ok := rd.subscriptions[id] rd.subscriptionsLock.RUnlock() + rd.log.Infof("Reorg detected for subscriber %s at block %d", id, startingBlock.Num) + if ok { sub.ReorgedBlock <- startingBlock.Num <-sub.ReorgProcessed diff --git a/reorgdetector/reorgdetector_test.go b/reorgdetector/reorgdetector_test.go index a496d33f..af036561 100644 --- a/reorgdetector/reorgdetector_test.go +++ b/reorgdetector/reorgdetector_test.go @@ -24,7 +24,7 @@ func Test_ReorgDetector(t *testing.T) { // Create test DB dir testDir := path.Join(t.TempDir(), "file::memory:?cache=shared") - reorgDetector, err := New(clientL1.Client(), Config{DBPath: testDir, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 100)}) + reorgDetector, err := New(clientL1.Client(), Config{DBPath: testDir, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 100)}, L1) require.NoError(t, err) err = reorgDetector.Start(ctx) @@ -76,7 +76,7 @@ func Test_ReorgDetector(t *testing.T) { func TestGetTrackedBlocks(t *testing.T) { clientL1, _ := helpers.SimulatedBackend(t, nil, 0) testDir := path.Join(t.TempDir(), "file::memory:?cache=shared") - reorgDetector, err := New(clientL1.Client(), Config{DBPath: testDir, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 100)}) + reorgDetector, err := New(clientL1.Client(), Config{DBPath: testDir, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 100)}, L1) require.NoError(t, err) list, err := reorgDetector.getTrackedBlocks() require.NoError(t, err) @@ -130,7 +130,7 @@ func TestGetTrackedBlocks(t *testing.T) { func TestNotSubscribed(t *testing.T) { clientL1, _ := helpers.SimulatedBackend(t, nil, 0) testDir := path.Join(t.TempDir(), "file::memory:?cache=shared") - reorgDetector, err := New(clientL1.Client(), Config{DBPath: testDir, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 100)}) + reorgDetector, err := New(clientL1.Client(), Config{DBPath: testDir, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 100)}, L1) require.NoError(t, err) err = reorgDetector.AddBlockToTrack(context.Background(), "foo", 1, common.Hash{}) require.True(t, strings.Contains(err.Error(), "is not subscribed")) diff --git a/sync/driver.go b/sync/driver.go index f85c04fb..7d3068fb 100644 --- a/sync/driver.go +++ b/sync/driver.go @@ -3,6 +3,8 @@ package sync import ( "context" "errors" + + "github.com/ethereum/go-ethereum/common" ) var ErrInconsistentState = errors.New("state is inconsistent, try again later once the state is consolidated") @@ -10,6 +12,7 @@ var ErrInconsistentState = errors.New("state is inconsistent, try again later on type Block struct { Num uint64 Events []interface{} + Hash common.Hash } type ProcessorInterface interface { diff --git a/sync/evmdownloader.go b/sync/evmdownloader.go index 13539f2f..e22eabd6 100644 --- a/sync/evmdownloader.go +++ b/sync/evmdownloader.go @@ -76,7 +76,7 @@ func (d *EVMDownloader) Download(ctx context.Context, fromBlock uint64, download for { select { case <-ctx.Done(): - d.log.Debug("closing channel") + d.log.Info("closing evm downloader channel") close(downloadedCh) return default: diff --git a/sync/evmdriver.go b/sync/evmdriver.go index 3412cd13..43ee2310 100644 --- a/sync/evmdriver.go +++ b/sync/evmdriver.go @@ -101,7 +101,7 @@ reset: d.log.Debugf("handleNewBlock, blockNum: %d, blockHash: %s", b.Num, b.Hash) d.handleNewBlock(ctx, cancel, b) case firstReorgedBlock := <-d.reorgSub.ReorgedBlock: - d.log.Debug("handleReorg from block: ", firstReorgedBlock) + d.log.Info("handleReorg from block: ", firstReorgedBlock) d.handleReorg(ctx, cancel, firstReorgedBlock) goto reset } @@ -143,6 +143,7 @@ func (d *EVMDriver) handleNewBlock(ctx context.Context, cancel context.CancelFun blockToProcess := Block{ Num: b.Num, Events: b.Events, + Hash: b.Hash, } err := d.processor.ProcessBlock(ctx, blockToProcess) if err != nil { diff --git a/sync/evmdriver_test.go b/sync/evmdriver_test.go index ef551d0f..9edbf0b2 100644 --- a/sync/evmdriver_test.go +++ b/sync/evmdriver_test.go @@ -91,11 +91,11 @@ func TestSync(t *testing.T) { Return(uint64(3), nil) rdm.On("AddBlockToTrack", ctx, reorgDetectorID, expectedBlock1.Num, expectedBlock1.Hash). Return(nil) - pm.On("ProcessBlock", ctx, Block{Num: expectedBlock1.Num, Events: expectedBlock1.Events}). + pm.On("ProcessBlock", ctx, Block{Num: expectedBlock1.Num, Events: expectedBlock1.Events, Hash: expectedBlock1.Hash}). Return(nil) rdm.On("AddBlockToTrack", ctx, reorgDetectorID, expectedBlock2.Num, expectedBlock2.Hash). Return(nil) - pm.On("ProcessBlock", ctx, Block{Num: expectedBlock2.Num, Events: expectedBlock2.Events}). + pm.On("ProcessBlock", ctx, Block{Num: expectedBlock2.Num, Events: expectedBlock2.Events, Hash: expectedBlock2.Hash}). Return(nil) go driver.Sync(ctx) time.Sleep(time.Millisecond * 200) // time to download expectedBlock1 @@ -142,7 +142,7 @@ func TestHandleNewBlock(t *testing.T) { rdm. On("AddBlockToTrack", ctx, reorgDetectorID, b1.Num, b1.Hash). Return(nil) - pm.On("ProcessBlock", ctx, Block{Num: b1.Num, Events: b1.Events}). + pm.On("ProcessBlock", ctx, Block{Num: b1.Num, Events: b1.Events, Hash: b1.Hash}). Return(nil) driver.handleNewBlock(ctx, nil, b1) @@ -159,7 +159,7 @@ func TestHandleNewBlock(t *testing.T) { rdm. On("AddBlockToTrack", ctx, reorgDetectorID, b2.Num, b2.Hash). Return(nil).Once() - pm.On("ProcessBlock", ctx, Block{Num: b2.Num, Events: b2.Events}). + pm.On("ProcessBlock", ctx, Block{Num: b2.Num, Events: b2.Events, Hash: b2.Hash}). Return(nil) driver.handleNewBlock(ctx, nil, b2) @@ -173,9 +173,9 @@ func TestHandleNewBlock(t *testing.T) { rdm. On("AddBlockToTrack", ctx, reorgDetectorID, b3.Num, b3.Hash). Return(nil) - pm.On("ProcessBlock", ctx, Block{Num: b3.Num, Events: b3.Events}). + pm.On("ProcessBlock", ctx, Block{Num: b3.Num, Events: b3.Events, Hash: b3.Hash}). Return(errors.New("foo")).Once() - pm.On("ProcessBlock", ctx, Block{Num: b3.Num, Events: b3.Events}). + pm.On("ProcessBlock", ctx, Block{Num: b3.Num, Events: b3.Events, Hash: b3.Hash}). Return(nil).Once() driver.handleNewBlock(ctx, nil, b3) @@ -189,7 +189,7 @@ func TestHandleNewBlock(t *testing.T) { rdm. On("AddBlockToTrack", ctx, reorgDetectorID, b4.Num, b4.Hash). Return(nil) - pm.On("ProcessBlock", ctx, Block{Num: b4.Num, Events: b4.Events}). + pm.On("ProcessBlock", ctx, Block{Num: b4.Num, Events: b4.Events, Hash: b4.Hash}). Return(ErrInconsistentState) cancelIsCalled := false cancel := func() { diff --git a/test/aggoraclehelpers/aggoracle_e2e.go b/test/aggoraclehelpers/aggoracle_e2e.go index 7830b941..380652ea 100644 --- a/test/aggoraclehelpers/aggoracle_e2e.go +++ b/test/aggoraclehelpers/aggoracle_e2e.go @@ -106,7 +106,7 @@ func CommonSetup(t *testing.T) ( // Reorg detector dbPathReorgDetector := path.Join(t.TempDir(), "file::memory:?cache=shared") - reorg, err := reorgdetector.New(l1Client.Client(), reorgdetector.Config{DBPath: dbPathReorgDetector}) + reorg, err := reorgdetector.New(l1Client.Client(), reorgdetector.Config{DBPath: dbPathReorgDetector}, reorgdetector.L1) require.NoError(t, err) // Syncer