From b3c0976a2930c6f60ae13f099f666227a3ad7b23 Mon Sep 17 00:00:00 2001 From: bros Date: Mon, 8 Jul 2024 15:11:34 +0000 Subject: [PATCH] complete processor testing --- localbridgesync/processor.go | 69 +++++++++---- localbridgesync/processor_test.go | 154 ++++++++++++++++++++++++++---- 2 files changed, 186 insertions(+), 37 deletions(-) diff --git a/localbridgesync/processor.go b/localbridgesync/processor.go index 0c4134fe..f93ef3e6 100644 --- a/localbridgesync/processor.go +++ b/localbridgesync/processor.go @@ -11,11 +11,13 @@ import ( ) const ( - eventsTable = "localbridgesync-events" + eventsTable = "localbridgesync-events" + lastBlockTable = "localbridgesync-lastBlock" ) var ( ErrBlockNotProcessed = errors.New("given block(s) have not been processed yet") + lastBlokcKey = []byte("lb") ) type processor struct { @@ -24,7 +26,8 @@ type processor struct { func tableCfgFunc(defaultBuckets kv.TableCfg) kv.TableCfg { return kv.TableCfg{ - eventsTable: {}, + eventsTable: {}, + lastBlockTable: {}, } } @@ -41,11 +44,11 @@ func newProcessor(dbPath string) (*processor, error) { }, nil } -// GetClaimsAndBridges returns the claims and bridges occurred between fromBlock, toBlock both included +// GetClaimsAndBridges returns the claims and bridges occurred between fromBlock, toBlock both included. +// If toBlock has not been porcessed yet, ErrBlockNotProcessed will be returned func (p *processor) GetClaimsAndBridges( ctx context.Context, fromBlock, toBlock uint64, ) ([]Claim, []Bridge, error) { - // TODO: if toBlock is not yet synced, return error, however we do not store blocks if they have no events :(? claims := []Claim{} bridges := []Bridge{} @@ -54,17 +57,21 @@ func (p *processor) GetClaimsAndBridges( return nil, nil, err } defer tx.Rollback() + lpb, err := p.getLastProcessedBlockWithTx(tx) + if lpb < toBlock { + return nil, nil, ErrBlockNotProcessed + } c, err := tx.Cursor(eventsTable) if err != nil { return nil, nil, err } defer c.Close() - for k, v, err := c.Seek(blockNum2Key(fromBlock)); k != nil; k, v, err = c.Next() { + for k, v, err := c.Seek(blockNum2Bytes(fromBlock)); k != nil; k, v, err = c.Next() { if err != nil { return nil, nil, err } - if key2BlockNum(k) > toBlock { + if bytes2BlockNum(k) > toBlock { break } block := bridgeEvents{} @@ -80,7 +87,22 @@ func (p *processor) GetClaimsAndBridges( } func (p *processor) getLastProcessedBlock(ctx context.Context) (uint64, error) { - return 0, errors.New("not implemented") + tx, err := p.db.BeginRo(ctx) + if err != nil { + return 0, err + } + defer tx.Rollback() + return p.getLastProcessedBlockWithTx(tx) +} + +func (p *processor) getLastProcessedBlockWithTx(tx kv.Tx) (uint64, error) { + if blockNumBytes, err := tx.GetOne(lastBlockTable, lastBlokcKey); err != nil { + return 0, err + } else if blockNumBytes == nil { + return 0, nil + } else { + return bytes2BlockNum(blockNumBytes), nil + } } func (p *processor) reorg(firstReorgedBlock uint64) error { @@ -93,7 +115,7 @@ func (p *processor) reorg(firstReorgedBlock uint64) error { return err } defer c.Close() - firstKey := blockNum2Key(firstReorgedBlock) + firstKey := blockNum2Bytes(firstReorgedBlock) for k, _, err := c.Seek(firstKey); k != nil; k, _, err = c.Next() { if err != nil { tx.Rollback() @@ -104,32 +126,47 @@ func (p *processor) reorg(firstReorgedBlock uint64) error { return err } } + if err := p.updateLastProcessedBlock(tx, firstReorgedBlock-1); err != nil { + tx.Rollback() + return err + } return tx.Commit() } func (p *processor) storeBridgeEvents(blockNum uint64, block bridgeEvents) error { - // TODO: add logic to store last processed block even if there are no events - value, err := json.Marshal(block) - if err != nil { - return err - } tx, err := p.db.BeginRw(context.Background()) if err != nil { return err } - if err := tx.Put(eventsTable, blockNum2Key(blockNum), value); err != nil { + if len(block.Bridges) > 0 || len(block.Claims) > 0 { + value, err := json.Marshal(block) + if err != nil { + tx.Rollback() + return err + } + if err := tx.Put(eventsTable, blockNum2Bytes(blockNum), value); err != nil { + tx.Rollback() + return err + } + } + if err := p.updateLastProcessedBlock(tx, blockNum); err != nil { tx.Rollback() return err } return tx.Commit() } -func blockNum2Key(blockNum uint64) []byte { +func (p *processor) updateLastProcessedBlock(tx kv.RwTx, blockNum uint64) error { + blockNumBytes := blockNum2Bytes(blockNum) + return tx.Put(lastBlockTable, lastBlokcKey, blockNumBytes) +} + +func blockNum2Bytes(blockNum uint64) []byte { key := make([]byte, 8) binary.LittleEndian.PutUint64(key, blockNum) return key } -func key2BlockNum(key []byte) uint64 { +func bytes2BlockNum(key []byte) uint64 { return binary.LittleEndian.Uint64(key) } diff --git a/localbridgesync/processor_test.go b/localbridgesync/processor_test.go index 14db8627..0c446d3d 100644 --- a/localbridgesync/processor_test.go +++ b/localbridgesync/processor_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "math/big" + "slices" "testing" "github.com/ethereum/go-ethereum/common" @@ -19,6 +20,7 @@ func TestProceessor(t *testing.T) { &getLastProcessedBlockAction{ p: p, description: "on an empty processor", + ctx: context.Background(), expectedLastProcessedBlock: 0, expectedErr: nil, }, @@ -40,8 +42,8 @@ func TestProceessor(t *testing.T) { ctx: context.Background(), fromBlock: 0, toBlock: 2, - expectedClaims: []Claim{}, - expectedBridges: []Bridge{}, + expectedClaims: nil, + expectedBridges: nil, expectedErr: ErrBlockNotProcessed, }, &storeBridgeEventsAction{ @@ -55,6 +57,7 @@ func TestProceessor(t *testing.T) { &getLastProcessedBlockAction{ p: p, description: "after block1", + ctx: context.Background(), expectedLastProcessedBlock: 1, expectedErr: nil, }, @@ -64,13 +67,13 @@ func TestProceessor(t *testing.T) { ctx: context.Background(), fromBlock: 0, toBlock: 2, - expectedClaims: block1.Events.Claims, - expectedBridges: block1.Events.Bridges, - expectedErr: nil, + expectedClaims: nil, + expectedBridges: nil, + expectedErr: ErrBlockNotProcessed, }, &getClaimsAndBridgesAction{ p: p, - description: "after block1: range 0, 1", + description: "after block1: range 1, 1", ctx: context.Background(), fromBlock: 1, toBlock: 1, @@ -78,16 +81,6 @@ func TestProceessor(t *testing.T) { expectedBridges: block1.Events.Bridges, expectedErr: nil, }, - &getClaimsAndBridgesAction{ - p: p, - description: "after block1: range 2, 2", - ctx: context.Background(), - fromBlock: 2, - toBlock: 2, - expectedClaims: []Claim{}, - expectedBridges: []Bridge{}, - expectedErr: ErrBlockNotProcessed, - }, &reorgAction{ p: p, description: "after block1", @@ -101,8 +94,8 @@ func TestProceessor(t *testing.T) { ctx: context.Background(), fromBlock: 0, toBlock: 2, - expectedClaims: []Claim{}, - expectedBridges: []Bridge{}, + expectedClaims: nil, + expectedBridges: nil, expectedErr: ErrBlockNotProcessed, }, &storeBridgeEventsAction{ @@ -120,7 +113,14 @@ func TestProceessor(t *testing.T) { block: block3.Events, expectedErr: nil, }, - // processed: block1, block2 + // processed: block1, block3 + &getLastProcessedBlockAction{ + p: p, + description: "after block3", + ctx: context.Background(), + expectedLastProcessedBlock: 3, + expectedErr: nil, + }, &getClaimsAndBridgesAction{ p: p, description: "after block3: range 2, 2", @@ -131,8 +131,114 @@ func TestProceessor(t *testing.T) { expectedBridges: []Bridge{}, expectedErr: nil, }, - - // TODO: keep going! + &getClaimsAndBridgesAction{ + p: p, + description: "after block3: range 1, 3", + ctx: context.Background(), + fromBlock: 1, + toBlock: 3, + expectedClaims: append(block1.Events.Claims, block3.Events.Claims...), + expectedBridges: append(block1.Events.Bridges, block3.Events.Bridges...), + expectedErr: nil, + }, + &reorgAction{ + p: p, + description: "after block3, with value 3", + firstReorgedBlock: 3, + expectedErr: nil, + }, + // processed: block1 + &getLastProcessedBlockAction{ + p: p, + description: "after block3 reorged", + ctx: context.Background(), + expectedLastProcessedBlock: 2, + expectedErr: nil, + }, + &reorgAction{ + p: p, + description: "after block3, with value 2", + firstReorgedBlock: 2, + expectedErr: nil, + }, + &getLastProcessedBlockAction{ + p: p, + description: "after block2 reorged", + ctx: context.Background(), + expectedLastProcessedBlock: 1, + expectedErr: nil, + }, + &storeBridgeEventsAction{ + p: p, + description: "block3 after reorg", + blockNum: block3.Num, + block: block3.Events, + expectedErr: nil, + }, + // processed: block1, block3 + &storeBridgeEventsAction{ + p: p, + description: "block4", + blockNum: block4.Num, + block: block4.Events, + expectedErr: nil, + }, + // processed: block1, block3, block4 + &storeBridgeEventsAction{ + p: p, + description: "block5", + blockNum: block5.Num, + block: block5.Events, + expectedErr: nil, + }, + // processed: block1, block3, block4, block5 + &getLastProcessedBlockAction{ + p: p, + description: "after block5", + ctx: context.Background(), + expectedLastProcessedBlock: 5, + expectedErr: nil, + }, + &getClaimsAndBridgesAction{ + p: p, + description: "after block5: range 1, 3", + ctx: context.Background(), + fromBlock: 1, + toBlock: 3, + expectedClaims: append(block1.Events.Claims, block3.Events.Claims...), + expectedBridges: append(block1.Events.Bridges, block3.Events.Bridges...), + expectedErr: nil, + }, + &getClaimsAndBridgesAction{ + p: p, + description: "after block5: range 4, 5", + ctx: context.Background(), + fromBlock: 4, + toBlock: 5, + expectedClaims: append(block4.Events.Claims, block5.Events.Claims...), + expectedBridges: append(block4.Events.Bridges, block5.Events.Bridges...), + expectedErr: nil, + }, + &getClaimsAndBridgesAction{ + p: p, + description: "after block5: range 0, 5", + ctx: context.Background(), + fromBlock: 0, + toBlock: 5, + expectedClaims: slices.Concat( + block1.Events.Claims, + block3.Events.Claims, + block4.Events.Claims, + block5.Events.Claims, + ), + expectedBridges: slices.Concat( + block1.Events.Bridges, + block3.Events.Bridges, + block4.Events.Bridges, + block5.Events.Bridges, + ), + expectedErr: nil, + }, } for _, a := range actions { @@ -202,6 +308,7 @@ var ( DepositCount: 3, }, }, + Claims: []Claim{}, }, } block4 = block{ @@ -209,6 +316,10 @@ var ( Num: 4, Hash: common.HexToHash("03"), }, + Events: bridgeEvents{ + Bridges: []Bridge{}, + Claims: []Claim{}, + }, } block5 = block{ blockHeader: blockHeader{ @@ -216,6 +327,7 @@ var ( Hash: common.HexToHash("04"), }, Events: bridgeEvents{ + Bridges: []Bridge{}, Claims: []Claim{ { GlobalIndex: big.NewInt(4),