Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/develop' into feature/CDK-385-2
Browse files Browse the repository at this point in the history
  • Loading branch information
rbpol committed Sep 2, 2024
2 parents 3bc8089 + 6113321 commit 7fc033d
Show file tree
Hide file tree
Showing 5 changed files with 264 additions and 33 deletions.
100 changes: 100 additions & 0 deletions .github/workflows/test-resequence.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
name: Resequence test
on:
push:
branches:
# Disable test for the moment as it takes too long
- "this-test-is-disabled"


concurrency:
group: ${{ github.ref }}
cancel-in-progress: true

jobs:
Resequence:
runs-on: ubuntu-latest
# TODO: Add "cdk-validium" once it's ready
# strategy:
# matrix:
# da-mode: [ "rollup" ]
steps:
- name: Checkout cdk
uses: actions/checkout@v4
with:
path: cdk

- name: Checkout kurtosis-cdk
uses: actions/checkout@v4
with:
repository: 0xPolygonHermez/cdk-erigon
ref: banana
path: cdk-erigon

- name: Checkout kurtosis-cdk
uses: actions/checkout@v4
with:
repository: 0xPolygon/kurtosis-cdk
ref: 3debe0a4dd000e02f7e6bde3247432211bf0336f
path: kurtosis-cdk

- name: Install Kurtosis CDK tools
uses: ./kurtosis-cdk/.github/actions/setup-kurtosis-cdk

- name: Install Foundry
uses: foundry-rs/foundry-toolchain@v1

- name: Install yq
run: |
sudo curl -L https://github.com/mikefarah/yq/releases/download/v4.44.2/yq_linux_amd64 -o /usr/local/bin/yq
sudo chmod +x /usr/local/bin/yq
/usr/local/bin/yq --version
- name: Install polycli
run: |
tmp_dir=$(mktemp -d) && curl -L https://github.com/0xPolygon/polygon-cli/releases/download/v0.1.48/polycli_v0.1.48_linux_amd64.tar.gz | tar -xz -C "$tmp_dir" && mv "$tmp_dir"/* /usr/local/bin/polycli && rm -rf "$tmp_dir"
sudo chmod +x /usr/local/bin/polycli
/usr/local/bin/polycli version
- name: Build docker image
working-directory: ./cdk
run: docker build -t cdk:local --file Dockerfile .

- name: Remove unused flags
working-directory: ./kurtosis-cdk
run: |
sed -i '/zkevm.sequencer-batch-seal-time:/d' templates/cdk-erigon/config.yml
sed -i '/zkevm.sequencer-non-empty-batch-seal-time:/d' templates/cdk-erigon/config.yml
- name: Configure Kurtosis CDK
working-directory: ./kurtosis-cdk
run: |
/usr/local/bin/yq -i '.args.cdk_erigon_node_image = "jerrycgh/cdk-erigon:d5d04906f723f3f1d8c43c9e6baf3e18c27ff348"' params.yml
/usr/local/bin/yq -i '.args.cdk_node_image = "cdk:local"' params.yml
- name: Deploy Kurtosis CDK package
working-directory: ./kurtosis-cdk
run: kurtosis run --enclave cdk-v1 --args-file params.yml --image-download always .

- name: Test resequence
working-directory: ./cdk-erigon
run: .github/scripts/test_resequence.sh

- name: Prepare logs
if: always()
working-directory: ./kurtosis-cdk
run: |
mkdir -p ci_logs
cd ci_logs
kurtosis service logs cdk-v1 cdk-erigon-node-001 --all > cdk-erigon-node-001.log
kurtosis service logs cdk-v1 cdk-erigon-sequencer-001 --all > cdk-erigon-sequencer-001.log
kurtosis service logs cdk-v1 zkevm-agglayer-001 --all > zkevm-agglayer-001.log
kurtosis service logs cdk-v1 zkevm-prover-001 --all > zkevm-prover-001.log
kurtosis service logs cdk-v1 cdk-node-001 --all > cdk-node-001.log
kurtosis service logs cdk-v1 zkevm-bridge-service-001 --all > zkevm-bridge-service-001.log
- name: Upload logs
if: always()
uses: actions/upload-artifact@v3
with:
name: logs_${{ github.run_id }}
path: ./kurtosis-cdk/ci_logs
181 changes: 156 additions & 25 deletions aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const (
dataStreamType = 1
mockedStateRoot = "0x090bcaf734c4f06c93954a827b45a6e8c67b8e0fd1e0a35a1c5982d6961828f9"
mockedLocalExitRoot = "0x17c04c3760510b48c6012742c540a81aba4bca2f78b9d14bfd2f123e2e53ea3e"
maxDBBigIntValue = 9223372036854775807
)

var (
Expand All @@ -68,6 +69,8 @@ type Aggregator struct {
l1Syncr synchronizer.Synchronizer
halted atomic.Bool

streamClientMutex *sync.Mutex

profitabilityChecker aggregatorTxProfitabilityChecker
timeSendFinalProof time.Time
timeCleanupLockedProofs types.Duration
Expand Down Expand Up @@ -171,11 +174,13 @@ func New(
}

a := &Aggregator{
ctx: ctx,
cfg: cfg,
state: stateInterface,
etherman: etherman,
ethTxManager: ethTxManager,
streamClient: streamClient,
streamClientMutex: &sync.Mutex{},
l1Syncr: l1Syncr,
profitabilityChecker: profitabilityChecker,
stateDBMutex: &sync.Mutex{},
Expand All @@ -188,15 +193,28 @@ func New(
witnessRetrievalChan: make(chan state.DBBatch),
}

if a.ctx == nil {
a.ctx, a.exit = context.WithCancel(a.ctx)
}

// Set function to handle the batches from the data stream
if !cfg.SyncModeOnlyEnabled {
a.streamClient.SetProcessEntryFunc(a.handleReceivedDataStream)
a.l1Syncr.SetCallbackOnReorgDone(a.handleReorg)
a.l1Syncr.SetCallbackOnRollbackBatches(a.handleRollbackBatches)
}

return a, nil
}

func (a *Aggregator) resetCurrentBatchData() {
a.currentBatchStreamData = []byte{}
a.currentStreamBatchRaw = state.BatchRawV2{
Blocks: make([]state.L2BlockRaw, 0),
}
a.currentStreamL2Block = state.L2BlockRaw{}
}

func (a *Aggregator) retrieveWitness() {
var success bool
for {
Expand Down Expand Up @@ -232,14 +250,12 @@ func (a *Aggregator) retrieveWitness() {
func (a *Aggregator) handleReorg(reorgData synchronizer.ReorgExecutionResult) {
log.Warnf("Reorg detected, reorgData: %+v", reorgData)

ctx := context.Background()

// Get new latest verified batch number
lastVBatchNumber, err := a.l1Syncr.GetLastestVirtualBatchNumber(ctx)
lastVBatchNumber, err := a.l1Syncr.GetLastestVirtualBatchNumber(a.ctx)
if err != nil {
log.Errorf("Error getting last virtual batch number: %v", err)
} else {
err = a.state.DeleteBatchesNewerThanBatchNumber(ctx, lastVBatchNumber, nil)
err = a.state.DeleteBatchesNewerThanBatchNumber(a.ctx, lastVBatchNumber, nil)
if err != nil {
log.Errorf("Error deleting batches newer than batch number %d: %v", lastVBatchNumber, err)
}
Expand All @@ -248,11 +264,135 @@ func (a *Aggregator) handleReorg(reorgData synchronizer.ReorgExecutionResult) {
// Halt the aggregator
a.halted.Store(true)
for {
log.Warnf("Halting the aggregator due to a L1 reorg. Reorged data has been delete so it is safe to manually restart the aggregator.")
log.Errorf("Halting the aggregator due to a L1 reorg. Reorged data has been deleted so it is safe to manually restart the aggregator.")
time.Sleep(10 * time.Second) // nolint:gomnd
}
}

func (a *Aggregator) handleRollbackBatches(rollbackData synchronizer.RollbackBatchesData) {
log.Warnf("Rollback batches event, rollbackBatchesData: %+v", rollbackData)

a.streamClientMutex.Lock()
defer a.streamClientMutex.Unlock()

dsClientWasRunning := a.streamClient.IsStarted()

var err error

if dsClientWasRunning {
// Disable the process entry function to avoid processing the data stream
a.streamClient.ResetProcessEntryFunc()

// Stop Reading the data stream
err = a.streamClient.ExecCommandStop()
if err != nil {
log.Errorf("failed to stop data stream: %v.", err)
} else {
log.Info("Data stream client stopped")
}
}

// Get new last verified batch number from L1
var lastVerifiedBatchNumber uint64
if err == nil {
lastVerifiedBatchNumber, err = a.etherman.GetLatestVerifiedBatchNum()
if err != nil {
log.Errorf("Error getting latest verified batch number: %v", err)
}
}

// Check lastVerifiedBatchNumber makes sense
if err == nil && lastVerifiedBatchNumber > rollbackData.LastBatchNumber {
err = fmt.Errorf("last verified batch number %d is greater than the last batch number %d in the rollback data", lastVerifiedBatchNumber, rollbackData.LastBatchNumber)
}

// Delete invalidated batches
if err == nil {
err = a.state.DeleteBatchesNewerThanBatchNumber(a.ctx, rollbackData.LastBatchNumber, nil)
if err != nil {
log.Errorf("Error deleting batches newer than batch number %d: %v", rollbackData.LastBatchNumber, err)
} else {
log.Infof("Deleted batches newer than batch number %d", rollbackData.LastBatchNumber)
}
}

// Older batches data can also be deleted
if err == nil {
err = a.state.DeleteBatchesOlderThanBatchNumber(a.ctx, rollbackData.LastBatchNumber, nil)
if err != nil {
log.Errorf("Error deleting batches older than batch number %d: %v", rollbackData.LastBatchNumber, err)
} else {
log.Infof("Deleted batches older than batch number %d", rollbackData.LastBatchNumber)
}
}

// Delete wip proofs
if err == nil {
err = a.state.DeleteUngeneratedProofs(a.ctx, nil)
if err != nil {
log.Errorf("Error deleting ungenerated proofs: %v", err)
} else {
log.Info("Deleted ungenerated proofs")
}
}

// Delete any proof for the batches that have been rolled back
if err == nil {
err = a.state.DeleteGeneratedProofs(a.ctx, rollbackData.LastBatchNumber+1, maxDBBigIntValue, nil)
if err != nil {
log.Errorf("Error deleting generated proofs: %v", err)
} else {
log.Infof("Deleted generated proofs for batches newer than %d", rollbackData.LastBatchNumber)
}
}

if err == nil {
// Reset current batch data previously read from the data stream
a.resetCurrentBatchData()
a.currentStreamBatch = state.Batch{}
log.Info("Current batch data reset")

var marshalledBookMark []byte
// Reset the data stream reading point
bookMark := &datastream.BookMark{
Type: datastream.BookmarkType_BOOKMARK_TYPE_BATCH,
Value: rollbackData.LastBatchNumber + 1,
}

marshalledBookMark, err = proto.Marshal(bookMark)
if err != nil {
log.Error("failed to marshal bookmark: %v", err)
} else {
// Restart the stream client if needed
if dsClientWasRunning {
a.streamClient.SetProcessEntryFunc(a.handleReceivedDataStream)
err = a.streamClient.Start()
if err != nil {
log.Errorf("failed to start stream client, error: %v", err)
} else {
// Resume data stream reading
err = a.streamClient.ExecCommandStartBookmark(marshalledBookMark)
if err != nil {
log.Errorf("failed to connect to data stream: %v", err)
}
log.Info("Data stream client resumed")
}
}
}
}

if err == nil {
log.Info("Handling rollback batches event finished successfully")
} else {
// Halt the aggregator
a.halted.Store(true)
for {
log.Errorf("Halting the aggregator due to an error handling rollback batches event: %v", err)
time.Sleep(10 * time.Second) // nolint:gomnd
}
}
}

func (a *Aggregator) handleReceivedDataStream(entry *datastreamer.FileEntry, client *datastreamer.StreamClient, server *datastreamer.StreamServer) error {
forcedBlockhashL1 := common.Hash{}

Expand Down Expand Up @@ -421,11 +561,7 @@ func (a *Aggregator) handleReceivedDataStream(entry *datastreamer.FileEntry, cli
}

// Reset current batch data
a.currentBatchStreamData = []byte{}
a.currentStreamBatchRaw = state.BatchRawV2{
Blocks: make([]state.L2BlockRaw, 0),
}
a.currentStreamL2Block = state.L2BlockRaw{}
a.resetCurrentBatchData()

case datastreamer.EntryType(datastream.EntryType_ENTRY_TYPE_L2_BLOCK):
// Add previous block (if any) to the current batch
Expand Down Expand Up @@ -479,15 +615,7 @@ func (a *Aggregator) handleReceivedDataStream(entry *datastreamer.FileEntry, cli
}

// Start starts the aggregator
func (a *Aggregator) Start(ctx context.Context) error {
var cancel context.CancelFunc
if ctx == nil {
ctx = context.Background()
}
ctx, cancel = context.WithCancel(ctx)
a.ctx = ctx
a.exit = cancel

func (a *Aggregator) Start() error {
// Initial L1 Sync blocking
err := a.l1Syncr.Sync(true)
if err != nil {
Expand Down Expand Up @@ -523,18 +651,18 @@ func (a *Aggregator) Start(ctx context.Context) error {
}

// Cleanup data base
err = a.state.DeleteBatchesOlderThanBatchNumber(ctx, lastVerifiedBatchNumber, nil)
err = a.state.DeleteBatchesOlderThanBatchNumber(a.ctx, lastVerifiedBatchNumber, nil)
if err != nil {
return err
}

// Delete ungenerated recursive proofs
err = a.state.DeleteUngeneratedProofs(ctx, nil)
err = a.state.DeleteUngeneratedProofs(a.ctx, nil)
if err != nil {
return fmt.Errorf("failed to initialize proofs cache %w", err)
}

accInputHash, err := a.getVerifiedBatchAccInputHash(ctx, lastVerifiedBatchNumber)
accInputHash, err := a.getVerifiedBatchAccInputHash(a.ctx, lastVerifiedBatchNumber)
if err != nil {
return err
}
Expand All @@ -544,7 +672,7 @@ func (a *Aggregator) Start(ctx context.Context) error {

// Store Acc Input Hash of the latest verified batch
dummyDBBatch := state.DBBatch{Batch: state.Batch{BatchNumber: lastVerifiedBatchNumber, AccInputHash: *accInputHash}, Datastream: []byte{0}, Witness: []byte{0}}
err = a.state.AddBatch(ctx, &dummyDBBatch, nil)
err = a.state.AddBatch(a.ctx, &dummyDBBatch, nil)
if err != nil {
return err
}
Expand All @@ -561,6 +689,9 @@ func (a *Aggregator) Start(ctx context.Context) error {
}

// Start stream client
a.streamClientMutex.Lock()
defer a.streamClientMutex.Unlock()

err = a.streamClient.Start()
if err != nil {
log.Fatalf("failed to start stream client, error: %v", err)
Expand Down Expand Up @@ -591,8 +722,8 @@ func (a *Aggregator) Start(ctx context.Context) error {
}()
}

<-ctx.Done()
return ctx.Err()
<-a.ctx.Done()
return a.ctx.Err()
}

// Stop stops the Aggregator server.
Expand Down
Loading

0 comments on commit 7fc033d

Please sign in to comment.