Skip to content

Commit

Permalink
downnloader test wip
Browse files Browse the repository at this point in the history
  • Loading branch information
arnaubennassar committed Jul 11, 2024
1 parent 8456a3a commit c855468
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 28 deletions.
17 changes: 12 additions & 5 deletions localbridgesync/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,13 @@ func (d *downloader) download(ctx context.Context, fromBlock uint64, downloadedC
}

func (d *downloader) waitForNewBlocks(ctx context.Context, lastBlockSeen uint64) (newLastBlock uint64) {
attempts := 0
for {
lastBlock, err := d.ethClient.BlockNumber(ctx)
if err != nil {
attempts++
log.Error("error geting last block num from eth client: ", err)
time.Sleep(retryAfterErrorPeriod)
retryHandler("waitForNewBlocks", attempts)
continue
}
if lastBlock > lastBlockSeen {
Expand Down Expand Up @@ -141,10 +143,13 @@ func (d *downloader) getLogs(ctx context.Context, fromBlock, toBlock uint64) []t
},
ToBlock: new(big.Int).SetUint64(toBlock),
}
attempts := 0
for {
logs, err := d.ethClient.FilterLogs(ctx, query)
if err != nil {
attempts++
log.Error("error calling FilterLogs to eth client: ", err)
retryHandler("getLogs", attempts)
continue
}
return logs
Expand Down Expand Up @@ -202,12 +207,14 @@ func (d *downloader) appendLog(b *block, l types.Log) {
}
}

func (d *downloader) getBlockHeader(ctx context.Context, blokcNum uint64) blockHeader {
func (d *downloader) getBlockHeader(ctx context.Context, blockNum uint64) blockHeader {
attempts := 0
for {
header, err := d.ethClient.HeaderByNumber(ctx, big.NewInt(int64(blokcNum)))
header, err := d.ethClient.HeaderByNumber(ctx, big.NewInt(int64(blockNum)))
if err != nil {
log.Errorf("error getting block header for block %d, err: %v", blokcNum, err)
time.Sleep(retryAfterErrorPeriod)
attempts++
log.Errorf("error getting block header for block %d, err: %v", blockNum, err)
retryHandler("getBlockHeader", attempts)
continue
}
return blockHeader{
Expand Down
105 changes: 88 additions & 17 deletions localbridgesync/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"testing"

"github.com/0xPolygon/cdk-contracts-tooling/contracts/etrog/polygonzkevmbridge"
"github.com/0xPolygon/cdk-contracts-tooling/contracts/etrog/polygonzkevmbridgev2"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/stretchr/testify/assert"
Expand All @@ -22,37 +24,64 @@ const (
)

func TestGetEventsByBlockRange(t *testing.T) {
type testCase struct {
inputLogs []types.Log
fromBlock, toBlock uint64
expectedBlocks []block
}
testCases := []testCase{}
clientMock := NewL2Mock(t)
ctx := context.Background()
d, err := newDownloader(contractAddr, syncBlockChunck, clientMock)
require.NoError(t, err)
log, bridge := generateRandomBridge(t)

// case 1
log, bridge := generateBridge(t, 1)
logs := []types.Log{
*log,
}
clientMock.
On("FilterLogs", mock.Anything, mock.Anything).
Return(logs, nil)
blocks := []block{
{
blockHeader: blockHeader{
Num: log.BlockNumber,
Hash: log.BlockHash,
},
Events: bridgeEvents{
Bridges: []Bridge{bridge},
Claims: []Claim{},
},
},
}
case1 := testCase{
inputLogs: logs,
fromBlock: 0,
toBlock: 1,
expectedBlocks: blocks,
}
testCases = append(testCases, case1)

blocks := d.getEventsByBlockRange(ctx, 0, 1)
b := blocks[0]
assert.Equal(t, log.BlockHash, b.Hash)
assert.Equal(t, log.BlockNumber, b.Num)
assert.Equal(t, bridge, b.Events.Bridges[0])
for _, tc := range testCases {
clientMock.
On("FilterLogs", mock.Anything, mock.Anything).
Return(tc.inputLogs, nil)

actualBlocks := d.getEventsByBlockRange(ctx, 0, 1)
assert.Equal(t, tc.expectedBlocks, actualBlocks)
}
}

func generateRandomBridge(t *testing.T) (*types.Log, Bridge) {
func generateBridge(t *testing.T, blockNum uint32) (*types.Log, Bridge) {
b := Bridge{
LeafType: 1,
OriginNetwork: 1,
OriginNetwork: blockNum,
OriginAddress: contractAddr,
DestinationNetwork: 1,
DestinationNetwork: blockNum,
DestinationAddress: contractAddr,
Amount: big.NewInt(1),
Amount: big.NewInt(int64(blockNum)),
Metadata: common.Hex2Bytes("01"),
DepositCount: 1,
DepositCount: blockNum,
}
abi, err := polygonzkevmbridge.PolygonzkevmbridgeMetaData.GetAbi()
abi, err := polygonzkevmbridgev2.Polygonzkevmbridgev2MetaData.GetAbi()
require.NoError(t, err)
event, err := abi.EventByID(bridgeEventSignature)
require.NoError(t, err)
Expand All @@ -69,10 +98,52 @@ func generateRandomBridge(t *testing.T) (*types.Log, Bridge) {
require.NoError(t, err)
log := &types.Log{
Address: contractAddr,
BlockNumber: 1,
BlockHash: common.HexToHash("01"),
BlockNumber: uint64(blockNum),
BlockHash: common.BytesToHash(blockNum2Bytes(uint64(blockNum))),
Topics: []common.Hash{bridgeEventSignature},
Data: data,
}
return log, b
}

func generateClaimV1(t *testing.T, blockNum uint32) (*types.Log, Claim) {
abi, err := polygonzkevmbridge.PolygonzkevmbridgeMetaData.GetAbi()
require.NoError(t, err)
event, err := abi.EventByID(claimEventSignaturePreEtrog)
require.NoError(t, err)
return generateClaim(t, blockNum, event)
}

func generateClaimV2(t *testing.T, blockNum uint32) (*types.Log, Claim) {
abi, err := polygonzkevmbridgev2.Polygonzkevmbridgev2MetaData.GetAbi()
require.NoError(t, err)
event, err := abi.EventByID(claimEventSignature)
require.NoError(t, err)
return generateClaim(t, blockNum, event)
}

func generateClaim(t *testing.T, blockNum uint32, event *abi.Event) (*types.Log, Claim) {
c := Claim{
GlobalIndex: big.NewInt(int64(blockNum)),
OriginNetwork: blockNum,
OriginAddress: contractAddr,
DestinationAddress: contractAddr,
Amount: big.NewInt(int64(blockNum)),
}
data, err := event.Inputs.Pack(
c.GlobalIndex,
c.OriginNetwork,
c.OriginAddress,
c.DestinationAddress,
c.Amount,
)
require.NoError(t, err)
log := &types.Log{
Address: contractAddr,
BlockNumber: uint64(blockNum),
BlockHash: common.BytesToHash(blockNum2Bytes(uint64(blockNum))),
Topics: []common.Hash{bridgeEventSignature},
Data: data,
}
return log, c
}
18 changes: 13 additions & 5 deletions localbridgesync/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package localbridgesync

import (
"context"
"time"

"github.com/0xPolygon/cdk/log"
"github.com/0xPolygon/cdk/reorgdetector"
Expand Down Expand Up @@ -35,13 +34,16 @@ func newDriver(
}

func (d *driver) Sync(ctx context.Context) {
attempts := 0
for {
lastProcessedBlock, err := d.processor.getLastProcessedBlock(ctx)
if err != nil {
attempts++
log.Error("error geting last processed block: ", err)
time.Sleep(retryAfterErrorPeriod)
retryHandler("Sync", attempts)
continue
}
attempts = 0
cancellableCtx, cancel := context.WithCancel(ctx)
defer cancel()

Expand All @@ -62,20 +64,24 @@ func (d *driver) Sync(ctx context.Context) {
}

func (d *driver) handleNewBlock(ctx context.Context, b block) {
attempts := 0
for {
err := d.reorgDetector.AddBlockToTrack(ctx, reorgDetectorID, b.Num, b.Hash)
if err != nil {
attempts++
log.Errorf("error adding block %d to tracker: %v", b.Num, err)
time.Sleep(retryAfterErrorPeriod)
retryHandler("handleNewBlock", attempts)
continue
}
break
}
attempts = 0
for {
err := d.processor.storeBridgeEvents(b.Num, b.Events)
if err != nil {
attempts++
log.Errorf("error processing events for blcok %d, err: ", b.Num, err)
time.Sleep(retryAfterErrorPeriod)
retryHandler("handleNewBlock", attempts)
continue
}
break
Expand All @@ -92,14 +98,16 @@ func (d *driver) handleReorg(
_, ok = <-downloadCh
}
// handle reorg
attempts := 0
for {
err := d.processor.reorg(firstReorgedBlock)
if err != nil {
attempts++
log.Errorf(
"error processing reorg, last valid block %d, err: %v",
firstReorgedBlock, err,
)
time.Sleep(retryAfterErrorPeriod)
retryHandler("handleReorg", attempts)
continue
}
break
Expand Down
15 changes: 14 additions & 1 deletion localbridgesync/localbridgesync.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ package localbridgesync
import (
"errors"
"time"

"github.com/0xPolygon/cdk/log"
)

const (
retryAfterErrorPeriod = time.Second * 10
retryAfterErrorPeriod = time.Second * 10
maxRetryAttemptsAfterError = 5
)

type LocalBridgeSync struct {
Expand All @@ -17,3 +20,13 @@ func New() (*LocalBridgeSync, error) {
// init driver, processor and downloader
return &LocalBridgeSync{}, errors.New("not implemented")
}

func retryHandler(funcName string, attempts int) {
if attempts >= maxRetryAttemptsAfterError {
log.Fatalf(
"%s failed too many times (%d)",
funcName, maxRetryAttemptsAfterError,
)
}
time.Sleep(retryAfterErrorPeriod)
}

0 comments on commit c855468

Please sign in to comment.