Skip to content

Commit

Permalink
make event order deterministic
Browse files Browse the repository at this point in the history
  • Loading branch information
arnaubennassar authored and Stefan-Ethernal committed Sep 17, 2024
1 parent 0249949 commit 912b63e
Show file tree
Hide file tree
Showing 7 changed files with 185 additions and 225 deletions.
17 changes: 7 additions & 10 deletions localbridgesync/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,7 @@ func (d *downloaderImplementation) getEventsByBlockRange(ctx context.Context, fr
Num: l.BlockNumber,
Hash: l.BlockHash,
},
Events: bridgeEvents{
Claims: []Claim{},
Bridges: []Bridge{},
},
Events: []BridgeEvent{},
})
}
d.appendLog(&blocks[len(blocks)-1], l)
Expand Down Expand Up @@ -185,7 +182,7 @@ func (d *downloaderImplementation) appendLog(b *block, l types.Log) {
l, err,
)
}
b.Events.Bridges = append(b.Events.Bridges, Bridge{
b.Events = append(b.Events, BridgeEvent{Bridge: &Bridge{
LeafType: bridge.LeafType,
OriginNetwork: bridge.OriginNetwork,
OriginAddress: bridge.OriginAddress,
Expand All @@ -194,7 +191,7 @@ func (d *downloaderImplementation) appendLog(b *block, l types.Log) {
Amount: bridge.Amount,
Metadata: bridge.Metadata,
DepositCount: bridge.DepositCount,
})
}})
case claimEventSignature:
claim, err := d.bridgeContractV2.ParseClaimEvent(l)
if err != nil {
Expand All @@ -203,13 +200,13 @@ func (d *downloaderImplementation) appendLog(b *block, l types.Log) {
l, err,
)
}
b.Events.Claims = append(b.Events.Claims, Claim{
b.Events = append(b.Events, BridgeEvent{Claim: &Claim{
GlobalIndex: claim.GlobalIndex,
OriginNetwork: claim.OriginNetwork,
OriginAddress: claim.OriginAddress,
DestinationAddress: claim.DestinationAddress,
Amount: claim.Amount,
})
}})
case claimEventSignaturePreEtrog:
claim, err := d.bridgeContractV1.ParseClaimEvent(l)
if err != nil {
Expand All @@ -218,13 +215,13 @@ func (d *downloaderImplementation) appendLog(b *block, l types.Log) {
l, err,
)
}
b.Events.Claims = append(b.Events.Claims, Claim{
b.Events = append(b.Events, BridgeEvent{Claim: &Claim{
GlobalIndex: big.NewInt(int64(claim.Index)),
OriginNetwork: claim.OriginNetwork,
OriginAddress: claim.OriginAddress,
DestinationAddress: claim.DestinationAddress,
Amount: claim.Amount,
})
}})
default:
log.Fatalf("unexpected log %+v", l)
}
Expand Down
48 changes: 21 additions & 27 deletions localbridgesync/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,10 @@ func TestGetEventsByBlockRange(t *testing.T) {
Num: logC1.BlockNumber,
Hash: logC1.BlockHash,
},
Events: bridgeEvents{
Bridges: []Bridge{bridgeC1},
Claims: []Claim{},
Events: []BridgeEvent{
{
Bridge: &bridgeC1,
},
},
},
}
Expand Down Expand Up @@ -93,9 +94,11 @@ func TestGetEventsByBlockRange(t *testing.T) {
Num: logC2_1.BlockNumber,
Hash: logC2_1.BlockHash,
},
Events: bridgeEvents{
Bridges: []Bridge{bridgeC2_1, bridgeC2_2},
Claims: []Claim{claimC2_1, claimC2_2},
Events: []BridgeEvent{
{Bridge: &bridgeC2_1},
{Bridge: &bridgeC2_2},
{Claim: &claimC2_1},
{Claim: &claimC2_2},
},
},
}
Expand Down Expand Up @@ -125,19 +128,19 @@ func TestGetEventsByBlockRange(t *testing.T) {
Num: logC3_1.BlockNumber,
Hash: logC3_1.BlockHash,
},
Events: bridgeEvents{
Bridges: []Bridge{bridgeC3_1, bridgeC3_2},
Claims: []Claim{},
Events: []BridgeEvent{
{Bridge: &bridgeC3_1},
{Bridge: &bridgeC3_2},
},
},
{
blockHeader: blockHeader{
Num: logC3_3.BlockNumber,
Hash: logC3_3.BlockHash,
},
Events: bridgeEvents{
Bridges: []Bridge{},
Claims: []Claim{claimC3_1, claimC3_2},
Events: []BridgeEvent{
{Claim: &claimC3_1},
{Claim: &claimC3_2},
},
},
}
Expand Down Expand Up @@ -323,23 +326,17 @@ func TestDownload(t *testing.T) {
Num: 6,
Hash: common.HexToHash("06"),
},
Events: bridgeEvents{
Claims: []Claim{
{OriginNetwork: 6},
},
Bridges: []Bridge{},
Events: []BridgeEvent{
{Claim: &Claim{OriginNetwork: 6}},
},
}
b7 := block{
blockHeader: blockHeader{
Num: 7,
Hash: common.HexToHash("07"),
},
Events: bridgeEvents{
Claims: []Claim{},
Bridges: []Bridge{
{DestinationNetwork: 7},
},
Events: []BridgeEvent{
{Bridge: &Bridge{DestinationNetwork: 7}},
},
}
b8 := block{
Expand Down Expand Up @@ -378,11 +375,8 @@ func TestDownload(t *testing.T) {
Num: 30,
Hash: common.HexToHash("30"),
},
Events: bridgeEvents{
Claims: []Claim{},
Bridges: []Bridge{
{DestinationNetwork: 30},
},
Events: []BridgeEvent{
{Bridge: &Bridge{DestinationNetwork: 30}},
},
}
expectedBlocks = append(expectedBlocks, b30)
Expand Down
2 changes: 1 addition & 1 deletion localbridgesync/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type driver struct {

type processorInterface interface {
getLastProcessedBlock(ctx context.Context) (uint64, error)
storeBridgeEvents(blockNum uint64, block bridgeEvents) error
storeBridgeEvents(blockNum uint64, events []BridgeEvent) error
reorg(firstReorgedBlock uint64) error
}

Expand Down
10 changes: 5 additions & 5 deletions localbridgesync/mock_processor_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 14 additions & 16 deletions localbridgesync/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,42 +48,40 @@ func newProcessor(dbPath string) (*processor, error) {
// If toBlock has not been porcessed yet, ErrBlockNotProcessed will be returned
func (p *processor) GetClaimsAndBridges(
ctx context.Context, fromBlock, toBlock uint64,
) ([]Claim, []Bridge, error) {
claims := []Claim{}
bridges := []Bridge{}
) ([]BridgeEvent, error) {
events := []BridgeEvent{}

tx, err := p.db.BeginRo(ctx)
if err != nil {
return nil, nil, err
return nil, err
}
defer tx.Rollback()
lpb, err := p.getLastProcessedBlockWithTx(tx)
if lpb < toBlock {
return nil, nil, ErrBlockNotProcessed
return nil, ErrBlockNotProcessed
}
c, err := tx.Cursor(eventsTable)
if err != nil {
return nil, nil, err
return nil, err
}
defer c.Close()

for k, v, err := c.Seek(blockNum2Bytes(fromBlock)); k != nil; k, v, err = c.Next() {
if err != nil {
return nil, nil, err
return nil, err
}
if bytes2BlockNum(k) > toBlock {
break
}
block := bridgeEvents{}
err := json.Unmarshal(v, &block)
blockEvents := []BridgeEvent{}
err := json.Unmarshal(v, &blockEvents)
if err != nil {
return nil, nil, err
return nil, err
}
bridges = append(bridges, block.Bridges...)
claims = append(claims, block.Claims...)
events = append(events, blockEvents...)
}

return claims, bridges, nil
return events, nil
}

func (p *processor) getLastProcessedBlock(ctx context.Context) (uint64, error) {
Expand Down Expand Up @@ -133,13 +131,13 @@ func (p *processor) reorg(firstReorgedBlock uint64) error {
return tx.Commit()
}

func (p *processor) storeBridgeEvents(blockNum uint64, block bridgeEvents) error {
func (p *processor) storeBridgeEvents(blockNum uint64, events []BridgeEvent) error {
tx, err := p.db.BeginRw(context.Background())
if err != nil {
return err
}
if len(block.Bridges) > 0 || len(block.Claims) > 0 {
value, err := json.Marshal(block)
if len(events) > 0 {
value, err := json.Marshal(events)
if err != nil {
tx.Rollback()
return err
Expand Down
Loading

0 comments on commit 912b63e

Please sign in to comment.