Skip to content

Commit

Permalink
complete driver test
Browse files Browse the repository at this point in the history
  • Loading branch information
arnaubennassar committed Jul 12, 2024
1 parent 3d53c33 commit 153b976
Show file tree
Hide file tree
Showing 10 changed files with 411 additions and 171 deletions.
2 changes: 2 additions & 0 deletions localbridgesync/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ func TestDownload(t *testing.T) {
}

func TestWaitForNewBlocks(t *testing.T) {
retryAfterErrorPeriod = time.Millisecond * 100
clientMock := NewL2Mock(t)
ctx := context.Background()
d, err := newDownloader(contractAddr, clientMock)
Expand All @@ -429,6 +430,7 @@ func TestWaitForNewBlocks(t *testing.T) {
}

func TestGetBlockHeader(t *testing.T) {
retryAfterErrorPeriod = time.Millisecond * 100
clientMock := NewL2Mock(t)
ctx := context.Background()
d, err := newDownloader(contractAddr, clientMock)
Expand Down
67 changes: 44 additions & 23 deletions localbridgesync/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/0xPolygon/cdk/log"
"github.com/0xPolygon/cdk/reorgdetector"
"github.com/ethereum/go-ethereum/common"
)

const (
Expand All @@ -13,16 +14,29 @@ const (
)

type driver struct {
reorgDetector *reorgdetector.ReorgDetector
reorgDetector reorgDetectorInterface
reorgSub *reorgdetector.Subscription
processor *processor
downloader *downloader
processor processorInterface
downloader downloaderInterface
}

type processorInterface interface {
getLastProcessedBlock(ctx context.Context) (uint64, error)
storeBridgeEvents(blockNum uint64, block bridgeEvents) error
reorg(firstReorgedBlock uint64) error
}

type reorgDetectorInterface interface {
Subscribe(id string) *reorgdetector.Subscription
AddBlockToTrack(ctx context.Context, id string, blockNum uint64, blockHash common.Hash) error
}

type downloadFn func(ctx context.Context, d downloaderInterface, fromBlock, syncBlockChunkSize uint64, downloadedCh chan block)

func newDriver(
reorgDetector *reorgdetector.ReorgDetector,
processor *processor,
downloader *downloader,
reorgDetector reorgDetectorInterface,
processor processorInterface,
downloader downloaderInterface,
) (*driver, error) {
reorgSub := reorgDetector.Subscribe(reorgDetectorID)
return &driver{
Expand All @@ -33,32 +47,39 @@ func newDriver(
}, nil
}

func (d *driver) Sync(ctx context.Context) {
attempts := 0
func (d *driver) sync(ctx context.Context, syncBlockChunkSize uint64, download downloadFn) {
reset:
var (
lastProcessedBlock uint64
attempts int
err error
)
for {
lastProcessedBlock, err := d.processor.getLastProcessedBlock(ctx)
lastProcessedBlock, err = d.processor.getLastProcessedBlock(ctx)
if err != nil {
attempts++
log.Error("error geting last processed block: ", err)
retryHandler("Sync", attempts)
continue
}
attempts = 0
cancellableCtx, cancel := context.WithCancel(ctx)
defer cancel()
break
}
cancellableCtx, cancel := context.WithCancel(ctx)
defer cancel()

// start downloading
downloadCh := make(chan block, downloadBufferSize)
go download(cancellableCtx, d.downloader, lastProcessedBlock, 100, downloadCh) // TODO: 100 should come from config
// start downloading
downloadCh := make(chan block, downloadBufferSize)
go download(cancellableCtx, d.downloader, lastProcessedBlock, syncBlockChunkSize, downloadCh)

for {
select {
case b := <-downloadCh:
d.handleNewBlock(ctx, b)
case firstReorgedBlock := <-d.reorgSub.FirstReorgedBlock:
d.handleReorg(cancel, downloadCh, firstReorgedBlock)
break
}
for {
select {
case b := <-downloadCh:
log.Debug("handleNewBlock")
d.handleNewBlock(ctx, b)
case firstReorgedBlock := <-d.reorgSub.FirstReorgedBlock:
log.Debug("handleReorg")
d.handleReorg(cancel, downloadCh, firstReorgedBlock)
goto reset
}
}
}
Expand Down
204 changes: 204 additions & 0 deletions localbridgesync/driver_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
package localbridgesync

import (
"context"
"errors"
"testing"
"time"

"github.com/0xPolygon/cdk/log"
"github.com/0xPolygon/cdk/reorgdetector"
"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/require"
)

func TestSync(t *testing.T) {
retryAfterErrorPeriod = time.Millisecond * 100
rdm := NewReorgDetectorMock(t)
pm := NewProcessorMock(t)
dm := NewDownloaderMock(t)
firstReorgedBlock := make(chan uint64)
reorgProcessed := make(chan bool)
rdm.On("Subscribe", reorgDetectorID).Return(&reorgdetector.Subscription{
FirstReorgedBlock: firstReorgedBlock,
ReorgProcessed: reorgProcessed,
})
driver, err := newDriver(rdm, pm, dm)
require.NoError(t, err)
ctx := context.Background()
expectedBlock1 := block{
blockHeader: blockHeader{
Num: 3,
Hash: common.HexToHash("03"),
},
}
expectedBlock2 := block{
blockHeader: blockHeader{
Num: 9,
Hash: common.HexToHash("09"),
},
}
reorg1Completed := false

mockDownload := func(
ctx context.Context,
d downloaderInterface,
fromBlock, syncBlockChunkSize uint64,
downloadedCh chan block,
) {
log.Info("entering mock loop")
for {
select {
case <-ctx.Done():
log.Info("closing channel")
close(downloadedCh)
return
default:
}
if reorg1Completed {
downloadedCh <- expectedBlock2
} else {
downloadedCh <- expectedBlock1
}
time.Sleep(100 * time.Millisecond)
}
}

// Mocking this actions, the driver should "store" all the blocks from the downloader
pm.On("getLastProcessedBlock", ctx).
Return(uint64(3), nil)
rdm.On("AddBlockToTrack", ctx, reorgDetectorID, expectedBlock1.Num, expectedBlock1.Hash).
Return(nil)
pm.On("storeBridgeEvents", expectedBlock1.Num, expectedBlock1.Events).
Return(nil)
rdm.On("AddBlockToTrack", ctx, reorgDetectorID, expectedBlock2.Num, expectedBlock2.Hash).
Return(nil)
pm.On("storeBridgeEvents", expectedBlock2.Num, expectedBlock2.Events).
Return(nil)
go driver.sync(ctx, syncBlockChunck, mockDownload)
time.Sleep(time.Millisecond * 200) // time to download expectedBlock1

// Trigger reorg 1
reorgedBlock1 := uint64(5)
pm.On("reorg", reorgedBlock1).Return(nil)
firstReorgedBlock <- reorgedBlock1
ok := <-reorgProcessed
require.True(t, ok)
reorg1Completed = true
time.Sleep(time.Millisecond * 200) // time to download expectedBlock2

// Trigger reorg 2: syncer restarts the porcess
reorgedBlock2 := uint64(7)
pm.On("reorg", reorgedBlock2).Return(nil)
firstReorgedBlock <- reorgedBlock2
ok = <-reorgProcessed
require.True(t, ok)
}

func TestHandleNewBlock(t *testing.T) {
retryAfterErrorPeriod = time.Millisecond * 100
rdm := NewReorgDetectorMock(t)
pm := NewProcessorMock(t)
dm := NewDownloaderMock(t)
rdm.On("Subscribe", reorgDetectorID).Return(&reorgdetector.Subscription{})
driver, err := newDriver(rdm, pm, dm)
require.NoError(t, err)
ctx := context.Background()

// happy path
b1 := block{
blockHeader: blockHeader{
Num: 1,
Hash: common.HexToHash("f00"),
},
}
rdm.
On("AddBlockToTrack", ctx, reorgDetectorID, b1.Num, b1.Hash).
Return(nil)
pm.On("storeBridgeEvents", b1.Num, b1.Events).
Return(nil)
driver.handleNewBlock(ctx, b1)

// reorg deteector fails once
b2 := block{
blockHeader: blockHeader{
Num: 2,
Hash: common.HexToHash("f00"),
},
}
rdm.
On("AddBlockToTrack", ctx, reorgDetectorID, b2.Num, b2.Hash).
Return(errors.New("foo")).Once()
rdm.
On("AddBlockToTrack", ctx, reorgDetectorID, b2.Num, b2.Hash).
Return(nil).Once()
pm.On("storeBridgeEvents", b2.Num, b2.Events).
Return(nil)
driver.handleNewBlock(ctx, b2)

// processor fails once
b3 := block{
blockHeader: blockHeader{
Num: 3,
Hash: common.HexToHash("f00"),
},
}
rdm.
On("AddBlockToTrack", ctx, reorgDetectorID, b3.Num, b3.Hash).
Return(nil)
pm.On("storeBridgeEvents", b3.Num, b3.Events).
Return(errors.New("foo")).Once()
pm.On("storeBridgeEvents", b3.Num, b3.Events).
Return(nil).Once()
driver.handleNewBlock(ctx, b3)

}

func TestHandleReorg(t *testing.T) {
retryAfterErrorPeriod = time.Millisecond * 100
rdm := NewReorgDetectorMock(t)
pm := NewProcessorMock(t)
dm := NewDownloaderMock(t)
reorgProcessed := make(chan bool)
rdm.On("Subscribe", reorgDetectorID).Return(&reorgdetector.Subscription{
ReorgProcessed: reorgProcessed,
})
driver, err := newDriver(rdm, pm, dm)
require.NoError(t, err)
ctx := context.Background()

// happy path
_, cancel := context.WithCancel(ctx)
downloadCh := make(chan block)
firstReorgedBlock := uint64(5)
pm.On("reorg", firstReorgedBlock).Return(nil)
go driver.handleReorg(cancel, downloadCh, firstReorgedBlock)
close(downloadCh)
done := <-reorgProcessed
require.True(t, done)

// download ch sends some garbage
_, cancel = context.WithCancel(ctx)
downloadCh = make(chan block)
firstReorgedBlock = uint64(6)
pm.On("reorg", firstReorgedBlock).Return(nil)
go driver.handleReorg(cancel, downloadCh, firstReorgedBlock)
downloadCh <- block{}
downloadCh <- block{}
downloadCh <- block{}
close(downloadCh)
done = <-reorgProcessed
require.True(t, done)

// processor fails 2 times
_, cancel = context.WithCancel(ctx)
downloadCh = make(chan block)
firstReorgedBlock = uint64(7)
pm.On("reorg", firstReorgedBlock).Return(errors.New("foo")).Once()
pm.On("reorg", firstReorgedBlock).Return(errors.New("foo")).Once()
pm.On("reorg", firstReorgedBlock).Return(nil).Once()
go driver.handleReorg(cancel, downloadCh, firstReorgedBlock)
close(downloadCh)
done = <-reorgProcessed
require.True(t, done)
}
2 changes: 1 addition & 1 deletion localbridgesync/localbridgesync.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/0xPolygon/cdk/log"
)

const (
var (
retryAfterErrorPeriod = time.Second * 10
maxRetryAttemptsAfterError = 5
)
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 153b976

Please sign in to comment.