From 912b63eca7d43c7e20eb14963f80f68866141be3 Mon Sep 17 00:00:00 2001 From: Arnau Date: Tue, 16 Jul 2024 10:08:06 +0200 Subject: [PATCH] make event order deterministic --- localbridgesync/downloader.go | 17 +- localbridgesync/downloader_test.go | 48 ++-- localbridgesync/driver.go | 2 +- localbridgesync/mock_processor_test.go | 10 +- localbridgesync/processor.go | 30 ++- localbridgesync/processor_test.go | 295 +++++++++++-------------- localbridgesync/types.go | 8 +- 7 files changed, 185 insertions(+), 225 deletions(-) diff --git a/localbridgesync/downloader.go b/localbridgesync/downloader.go index 3eb5c7db..69d04051 100644 --- a/localbridgesync/downloader.go +++ b/localbridgesync/downloader.go @@ -139,10 +139,7 @@ func (d *downloaderImplementation) getEventsByBlockRange(ctx context.Context, fr Num: l.BlockNumber, Hash: l.BlockHash, }, - Events: bridgeEvents{ - Claims: []Claim{}, - Bridges: []Bridge{}, - }, + Events: []BridgeEvent{}, }) } d.appendLog(&blocks[len(blocks)-1], l) @@ -185,7 +182,7 @@ func (d *downloaderImplementation) appendLog(b *block, l types.Log) { l, err, ) } - b.Events.Bridges = append(b.Events.Bridges, Bridge{ + b.Events = append(b.Events, BridgeEvent{Bridge: &Bridge{ LeafType: bridge.LeafType, OriginNetwork: bridge.OriginNetwork, OriginAddress: bridge.OriginAddress, @@ -194,7 +191,7 @@ func (d *downloaderImplementation) appendLog(b *block, l types.Log) { Amount: bridge.Amount, Metadata: bridge.Metadata, DepositCount: bridge.DepositCount, - }) + }}) case claimEventSignature: claim, err := d.bridgeContractV2.ParseClaimEvent(l) if err != nil { @@ -203,13 +200,13 @@ func (d *downloaderImplementation) appendLog(b *block, l types.Log) { l, err, ) } - b.Events.Claims = append(b.Events.Claims, Claim{ + b.Events = append(b.Events, BridgeEvent{Claim: &Claim{ GlobalIndex: claim.GlobalIndex, OriginNetwork: claim.OriginNetwork, OriginAddress: claim.OriginAddress, DestinationAddress: claim.DestinationAddress, Amount: claim.Amount, - }) + }}) case claimEventSignaturePreEtrog: claim, err := d.bridgeContractV1.ParseClaimEvent(l) if err != nil { @@ -218,13 +215,13 @@ func (d *downloaderImplementation) appendLog(b *block, l types.Log) { l, err, ) } - b.Events.Claims = append(b.Events.Claims, Claim{ + b.Events = append(b.Events, BridgeEvent{Claim: &Claim{ GlobalIndex: big.NewInt(int64(claim.Index)), OriginNetwork: claim.OriginNetwork, OriginAddress: claim.OriginAddress, DestinationAddress: claim.DestinationAddress, Amount: claim.Amount, - }) + }}) default: log.Fatalf("unexpected log %+v", l) } diff --git a/localbridgesync/downloader_test.go b/localbridgesync/downloader_test.go index be03a695..db13c4d3 100644 --- a/localbridgesync/downloader_test.go +++ b/localbridgesync/downloader_test.go @@ -61,9 +61,10 @@ func TestGetEventsByBlockRange(t *testing.T) { Num: logC1.BlockNumber, Hash: logC1.BlockHash, }, - Events: bridgeEvents{ - Bridges: []Bridge{bridgeC1}, - Claims: []Claim{}, + Events: []BridgeEvent{ + { + Bridge: &bridgeC1, + }, }, }, } @@ -93,9 +94,11 @@ func TestGetEventsByBlockRange(t *testing.T) { Num: logC2_1.BlockNumber, Hash: logC2_1.BlockHash, }, - Events: bridgeEvents{ - Bridges: []Bridge{bridgeC2_1, bridgeC2_2}, - Claims: []Claim{claimC2_1, claimC2_2}, + Events: []BridgeEvent{ + {Bridge: &bridgeC2_1}, + {Bridge: &bridgeC2_2}, + {Claim: &claimC2_1}, + {Claim: &claimC2_2}, }, }, } @@ -125,9 +128,9 @@ func TestGetEventsByBlockRange(t *testing.T) { Num: logC3_1.BlockNumber, Hash: logC3_1.BlockHash, }, - Events: bridgeEvents{ - Bridges: []Bridge{bridgeC3_1, bridgeC3_2}, - Claims: []Claim{}, + Events: []BridgeEvent{ + {Bridge: &bridgeC3_1}, + {Bridge: &bridgeC3_2}, }, }, { @@ -135,9 +138,9 @@ func TestGetEventsByBlockRange(t *testing.T) { Num: logC3_3.BlockNumber, Hash: logC3_3.BlockHash, }, - Events: bridgeEvents{ - Bridges: []Bridge{}, - Claims: []Claim{claimC3_1, claimC3_2}, + Events: []BridgeEvent{ + {Claim: &claimC3_1}, + {Claim: &claimC3_2}, }, }, } @@ -323,11 +326,8 @@ func TestDownload(t *testing.T) { Num: 6, Hash: common.HexToHash("06"), }, - Events: bridgeEvents{ - Claims: []Claim{ - {OriginNetwork: 6}, - }, - Bridges: []Bridge{}, + Events: []BridgeEvent{ + {Claim: &Claim{OriginNetwork: 6}}, }, } b7 := block{ @@ -335,11 +335,8 @@ func TestDownload(t *testing.T) { Num: 7, Hash: common.HexToHash("07"), }, - Events: bridgeEvents{ - Claims: []Claim{}, - Bridges: []Bridge{ - {DestinationNetwork: 7}, - }, + Events: []BridgeEvent{ + {Bridge: &Bridge{DestinationNetwork: 7}}, }, } b8 := block{ @@ -378,11 +375,8 @@ func TestDownload(t *testing.T) { Num: 30, Hash: common.HexToHash("30"), }, - Events: bridgeEvents{ - Claims: []Claim{}, - Bridges: []Bridge{ - {DestinationNetwork: 30}, - }, + Events: []BridgeEvent{ + {Bridge: &Bridge{DestinationNetwork: 30}}, }, } expectedBlocks = append(expectedBlocks, b30) diff --git a/localbridgesync/driver.go b/localbridgesync/driver.go index cc7b22af..997fd215 100644 --- a/localbridgesync/driver.go +++ b/localbridgesync/driver.go @@ -27,7 +27,7 @@ type driver struct { type processorInterface interface { getLastProcessedBlock(ctx context.Context) (uint64, error) - storeBridgeEvents(blockNum uint64, block bridgeEvents) error + storeBridgeEvents(blockNum uint64, events []BridgeEvent) error reorg(firstReorgedBlock uint64) error } diff --git a/localbridgesync/mock_processor_test.go b/localbridgesync/mock_processor_test.go index 7d9a0cbb..4a629f5c 100644 --- a/localbridgesync/mock_processor_test.go +++ b/localbridgesync/mock_processor_test.go @@ -51,13 +51,13 @@ func (_m *ProcessorMock) reorg(firstReorgedBlock uint64) error { return r0 } -// storeBridgeEvents provides a mock function with given fields: blockNum, block -func (_m *ProcessorMock) storeBridgeEvents(blockNum uint64, block bridgeEvents) error { - ret := _m.Called(blockNum, block) +// storeBridgeEvents provides a mock function with given fields: blockNum, events +func (_m *ProcessorMock) storeBridgeEvents(blockNum uint64, events []BridgeEvent) error { + ret := _m.Called(blockNum, events) var r0 error - if rf, ok := ret.Get(0).(func(uint64, bridgeEvents) error); ok { - r0 = rf(blockNum, block) + if rf, ok := ret.Get(0).(func(uint64, []BridgeEvent) error); ok { + r0 = rf(blockNum, events) } else { r0 = ret.Error(0) } diff --git a/localbridgesync/processor.go b/localbridgesync/processor.go index f93ef3e6..2e6f27a0 100644 --- a/localbridgesync/processor.go +++ b/localbridgesync/processor.go @@ -48,42 +48,40 @@ func newProcessor(dbPath string) (*processor, error) { // If toBlock has not been porcessed yet, ErrBlockNotProcessed will be returned func (p *processor) GetClaimsAndBridges( ctx context.Context, fromBlock, toBlock uint64, -) ([]Claim, []Bridge, error) { - claims := []Claim{} - bridges := []Bridge{} +) ([]BridgeEvent, error) { + events := []BridgeEvent{} tx, err := p.db.BeginRo(ctx) if err != nil { - return nil, nil, err + return nil, err } defer tx.Rollback() lpb, err := p.getLastProcessedBlockWithTx(tx) if lpb < toBlock { - return nil, nil, ErrBlockNotProcessed + return nil, ErrBlockNotProcessed } c, err := tx.Cursor(eventsTable) if err != nil { - return nil, nil, err + return nil, err } defer c.Close() for k, v, err := c.Seek(blockNum2Bytes(fromBlock)); k != nil; k, v, err = c.Next() { if err != nil { - return nil, nil, err + return nil, err } if bytes2BlockNum(k) > toBlock { break } - block := bridgeEvents{} - err := json.Unmarshal(v, &block) + blockEvents := []BridgeEvent{} + err := json.Unmarshal(v, &blockEvents) if err != nil { - return nil, nil, err + return nil, err } - bridges = append(bridges, block.Bridges...) - claims = append(claims, block.Claims...) + events = append(events, blockEvents...) } - return claims, bridges, nil + return events, nil } func (p *processor) getLastProcessedBlock(ctx context.Context) (uint64, error) { @@ -133,13 +131,13 @@ func (p *processor) reorg(firstReorgedBlock uint64) error { return tx.Commit() } -func (p *processor) storeBridgeEvents(blockNum uint64, block bridgeEvents) error { +func (p *processor) storeBridgeEvents(blockNum uint64, events []BridgeEvent) error { tx, err := p.db.BeginRw(context.Background()) if err != nil { return err } - if len(block.Bridges) > 0 || len(block.Claims) > 0 { - value, err := json.Marshal(block) + if len(events) > 0 { + value, err := json.Marshal(events) if err != nil { tx.Rollback() return err diff --git a/localbridgesync/processor_test.go b/localbridgesync/processor_test.go index 0c446d3d..8e6884c2 100644 --- a/localbridgesync/processor_test.go +++ b/localbridgesync/processor_test.go @@ -37,20 +37,19 @@ func TestProceessor(t *testing.T) { expectedErr: nil, }, &getClaimsAndBridgesAction{ - p: p, - description: "on an empty processor", - ctx: context.Background(), - fromBlock: 0, - toBlock: 2, - expectedClaims: nil, - expectedBridges: nil, - expectedErr: ErrBlockNotProcessed, + p: p, + description: "on an empty processor", + ctx: context.Background(), + fromBlock: 0, + toBlock: 2, + expectedEvents: nil, + expectedErr: ErrBlockNotProcessed, }, &storeBridgeEventsAction{ p: p, description: "block1", blockNum: block1.Num, - block: block1.Events, + events: block1.Events, expectedErr: nil, }, // processed: block1 @@ -62,24 +61,22 @@ func TestProceessor(t *testing.T) { expectedErr: nil, }, &getClaimsAndBridgesAction{ - p: p, - description: "after block1: range 0, 2", - ctx: context.Background(), - fromBlock: 0, - toBlock: 2, - expectedClaims: nil, - expectedBridges: nil, - expectedErr: ErrBlockNotProcessed, + p: p, + description: "after block1: range 0, 2", + ctx: context.Background(), + fromBlock: 0, + toBlock: 2, + expectedEvents: nil, + expectedErr: ErrBlockNotProcessed, }, &getClaimsAndBridgesAction{ - p: p, - description: "after block1: range 1, 1", - ctx: context.Background(), - fromBlock: 1, - toBlock: 1, - expectedClaims: block1.Events.Claims, - expectedBridges: block1.Events.Bridges, - expectedErr: nil, + p: p, + description: "after block1: range 1, 1", + ctx: context.Background(), + fromBlock: 1, + toBlock: 1, + expectedEvents: block1.Events, + expectedErr: nil, }, &reorgAction{ p: p, @@ -89,20 +86,19 @@ func TestProceessor(t *testing.T) { }, // processed: ~ &getClaimsAndBridgesAction{ - p: p, - description: "after block1 reorged", - ctx: context.Background(), - fromBlock: 0, - toBlock: 2, - expectedClaims: nil, - expectedBridges: nil, - expectedErr: ErrBlockNotProcessed, + p: p, + description: "after block1 reorged", + ctx: context.Background(), + fromBlock: 0, + toBlock: 2, + expectedEvents: nil, + expectedErr: ErrBlockNotProcessed, }, &storeBridgeEventsAction{ p: p, description: "block1 (after it's reorged)", blockNum: block1.Num, - block: block1.Events, + events: block1.Events, expectedErr: nil, }, // processed: block3 @@ -110,7 +106,7 @@ func TestProceessor(t *testing.T) { p: p, description: "block3", blockNum: block3.Num, - block: block3.Events, + events: block3.Events, expectedErr: nil, }, // processed: block1, block3 @@ -122,24 +118,22 @@ func TestProceessor(t *testing.T) { expectedErr: nil, }, &getClaimsAndBridgesAction{ - p: p, - description: "after block3: range 2, 2", - ctx: context.Background(), - fromBlock: 2, - toBlock: 2, - expectedClaims: []Claim{}, - expectedBridges: []Bridge{}, - expectedErr: nil, + p: p, + description: "after block3: range 2, 2", + ctx: context.Background(), + fromBlock: 2, + toBlock: 2, + expectedEvents: []BridgeEvent{}, + expectedErr: nil, }, &getClaimsAndBridgesAction{ - p: p, - description: "after block3: range 1, 3", - ctx: context.Background(), - fromBlock: 1, - toBlock: 3, - expectedClaims: append(block1.Events.Claims, block3.Events.Claims...), - expectedBridges: append(block1.Events.Bridges, block3.Events.Bridges...), - expectedErr: nil, + p: p, + description: "after block3: range 1, 3", + ctx: context.Background(), + fromBlock: 1, + toBlock: 3, + expectedEvents: append(block1.Events, block3.Events...), + expectedErr: nil, }, &reorgAction{ p: p, @@ -172,7 +166,7 @@ func TestProceessor(t *testing.T) { p: p, description: "block3 after reorg", blockNum: block3.Num, - block: block3.Events, + events: block3.Events, expectedErr: nil, }, // processed: block1, block3 @@ -180,7 +174,7 @@ func TestProceessor(t *testing.T) { p: p, description: "block4", blockNum: block4.Num, - block: block4.Events, + events: block4.Events, expectedErr: nil, }, // processed: block1, block3, block4 @@ -188,7 +182,7 @@ func TestProceessor(t *testing.T) { p: p, description: "block5", blockNum: block5.Num, - block: block5.Events, + events: block5.Events, expectedErr: nil, }, // processed: block1, block3, block4, block5 @@ -200,24 +194,22 @@ func TestProceessor(t *testing.T) { expectedErr: nil, }, &getClaimsAndBridgesAction{ - p: p, - description: "after block5: range 1, 3", - ctx: context.Background(), - fromBlock: 1, - toBlock: 3, - expectedClaims: append(block1.Events.Claims, block3.Events.Claims...), - expectedBridges: append(block1.Events.Bridges, block3.Events.Bridges...), - expectedErr: nil, + p: p, + description: "after block5: range 1, 3", + ctx: context.Background(), + fromBlock: 1, + toBlock: 3, + expectedEvents: append(block1.Events, block3.Events...), + expectedErr: nil, }, &getClaimsAndBridgesAction{ - p: p, - description: "after block5: range 4, 5", - ctx: context.Background(), - fromBlock: 4, - toBlock: 5, - expectedClaims: append(block4.Events.Claims, block5.Events.Claims...), - expectedBridges: append(block4.Events.Bridges, block5.Events.Bridges...), - expectedErr: nil, + p: p, + description: "after block5: range 4, 5", + ctx: context.Background(), + fromBlock: 4, + toBlock: 5, + expectedEvents: append(block4.Events, block5.Events...), + expectedErr: nil, }, &getClaimsAndBridgesAction{ p: p, @@ -225,17 +217,11 @@ func TestProceessor(t *testing.T) { ctx: context.Background(), fromBlock: 0, toBlock: 5, - expectedClaims: slices.Concat( - block1.Events.Claims, - block3.Events.Claims, - block4.Events.Claims, - block5.Events.Claims, - ), - expectedBridges: slices.Concat( - block1.Events.Bridges, - block3.Events.Bridges, - block4.Events.Bridges, - block5.Events.Bridges, + expectedEvents: slices.Concat( + block1.Events, + block3.Events, + block4.Events, + block5.Events, ), expectedErr: nil, }, @@ -256,28 +242,24 @@ var ( Num: 1, Hash: common.HexToHash("01"), }, - Events: bridgeEvents{ - Bridges: []Bridge{ - { - LeafType: 1, - OriginNetwork: 1, - OriginAddress: common.HexToAddress("01"), - DestinationNetwork: 1, - DestinationAddress: common.HexToAddress("01"), - Amount: big.NewInt(1), - Metadata: common.Hex2Bytes("01"), - DepositCount: 1, - }, - }, - Claims: []Claim{ - { - GlobalIndex: big.NewInt(1), - OriginNetwork: 1, - OriginAddress: common.HexToAddress("01"), - DestinationAddress: common.HexToAddress("01"), - Amount: big.NewInt(1), - }, - }, + Events: []BridgeEvent{ + {Bridge: &Bridge{ + LeafType: 1, + OriginNetwork: 1, + OriginAddress: common.HexToAddress("01"), + DestinationNetwork: 1, + DestinationAddress: common.HexToAddress("01"), + Amount: big.NewInt(1), + Metadata: common.Hex2Bytes("01"), + DepositCount: 1, + }}, + {Claim: &Claim{ + GlobalIndex: big.NewInt(1), + OriginNetwork: 1, + OriginAddress: common.HexToAddress("01"), + DestinationAddress: common.HexToAddress("01"), + Amount: big.NewInt(1), + }}, }, } block3 = block{ @@ -285,30 +267,27 @@ var ( Num: 3, Hash: common.HexToHash("02"), }, - Events: bridgeEvents{ - Bridges: []Bridge{ - { - LeafType: 2, - OriginNetwork: 2, - OriginAddress: common.HexToAddress("02"), - DestinationNetwork: 2, - DestinationAddress: common.HexToAddress("02"), - Amount: big.NewInt(2), - Metadata: common.Hex2Bytes("02"), - DepositCount: 2, - }, - { - LeafType: 3, - OriginNetwork: 3, - OriginAddress: common.HexToAddress("03"), - DestinationNetwork: 3, - DestinationAddress: common.HexToAddress("03"), - Amount: nil, - Metadata: common.Hex2Bytes("03"), - DepositCount: 3, - }, - }, - Claims: []Claim{}, + Events: []BridgeEvent{ + {Bridge: &Bridge{ + LeafType: 2, + OriginNetwork: 2, + OriginAddress: common.HexToAddress("02"), + DestinationNetwork: 2, + DestinationAddress: common.HexToAddress("02"), + Amount: big.NewInt(2), + Metadata: common.Hex2Bytes("02"), + DepositCount: 2, + }}, + {Bridge: &Bridge{ + LeafType: 3, + OriginNetwork: 3, + OriginAddress: common.HexToAddress("03"), + DestinationNetwork: 3, + DestinationAddress: common.HexToAddress("03"), + Amount: nil, + Metadata: common.Hex2Bytes("03"), + DepositCount: 3, + }}, }, } block4 = block{ @@ -316,34 +295,28 @@ var ( Num: 4, Hash: common.HexToHash("03"), }, - Events: bridgeEvents{ - Bridges: []Bridge{}, - Claims: []Claim{}, - }, + Events: []BridgeEvent{}, } block5 = block{ blockHeader: blockHeader{ Num: 5, Hash: common.HexToHash("04"), }, - Events: bridgeEvents{ - Bridges: []Bridge{}, - Claims: []Claim{ - { - GlobalIndex: big.NewInt(4), - OriginNetwork: 4, - OriginAddress: common.HexToAddress("04"), - DestinationAddress: common.HexToAddress("04"), - Amount: big.NewInt(4), - }, - { - GlobalIndex: big.NewInt(5), - OriginNetwork: 5, - OriginAddress: common.HexToAddress("05"), - DestinationAddress: common.HexToAddress("05"), - Amount: big.NewInt(5), - }, - }, + Events: []BridgeEvent{ + {Claim: &Claim{ + GlobalIndex: big.NewInt(4), + OriginNetwork: 4, + OriginAddress: common.HexToAddress("04"), + DestinationAddress: common.HexToAddress("04"), + Amount: big.NewInt(4), + }}, + {Claim: &Claim{ + GlobalIndex: big.NewInt(5), + OriginNetwork: 5, + OriginAddress: common.HexToAddress("05"), + DestinationAddress: common.HexToAddress("05"), + Amount: big.NewInt(5), + }}, }, } ) @@ -359,14 +332,13 @@ type processAction interface { // GetClaimsAndBridges type getClaimsAndBridgesAction struct { - p *processor - description string - ctx context.Context - fromBlock uint64 - toBlock uint64 - expectedClaims []Claim - expectedBridges []Bridge - expectedErr error + p *processor + description string + ctx context.Context + fromBlock uint64 + toBlock uint64 + expectedEvents []BridgeEvent + expectedErr error } func (a *getClaimsAndBridgesAction) method() string { @@ -378,9 +350,8 @@ func (a *getClaimsAndBridgesAction) desc() string { } func (a *getClaimsAndBridgesAction) execute(t *testing.T) { - actualClaims, actualBridges, actualErr := a.p.GetClaimsAndBridges(a.ctx, a.fromBlock, a.toBlock) - require.Equal(t, a.expectedClaims, actualClaims) - require.Equal(t, a.expectedBridges, actualBridges) + actualEvents, actualErr := a.p.GetClaimsAndBridges(a.ctx, a.fromBlock, a.toBlock) + require.Equal(t, a.expectedEvents, actualEvents) require.Equal(t, a.expectedErr, actualErr) } @@ -436,7 +407,7 @@ type storeBridgeEventsAction struct { p *processor description string blockNum uint64 - block bridgeEvents + events []BridgeEvent expectedErr error } @@ -449,6 +420,6 @@ func (a *storeBridgeEventsAction) desc() string { } func (a *storeBridgeEventsAction) execute(t *testing.T) { - actualErr := a.p.storeBridgeEvents(a.blockNum, a.block) + actualErr := a.p.storeBridgeEvents(a.blockNum, a.events) require.Equal(t, a.expectedErr, actualErr) } diff --git a/localbridgesync/types.go b/localbridgesync/types.go index 7e8748fa..3a6a508e 100644 --- a/localbridgesync/types.go +++ b/localbridgesync/types.go @@ -26,14 +26,14 @@ type Claim struct { Amount *big.Int } -type bridgeEvents struct { - Bridges []Bridge - Claims []Claim +type BridgeEvent struct { + Bridge *Bridge + Claim *Claim } type block struct { blockHeader - Events bridgeEvents + Events []BridgeEvent } type blockHeader struct {