Skip to content

Commit

Permalink
feat: align Develop with changes in Release/0.4.0 (#174)
Browse files Browse the repository at this point in the history
* feat: calculate acc input hash locally (#154)
  • Loading branch information
ToniRamirezM authored Nov 8, 2024
1 parent d9aa92a commit 61fe7f6
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 138 deletions.
135 changes: 113 additions & 22 deletions aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"errors"
"fmt"
"math"
"math/big"
"net"
"strings"
Expand Down Expand Up @@ -58,11 +59,13 @@ type Aggregator struct {
cfg Config
logger *log.Logger

state StateInterface
etherman Etherman
ethTxManager EthTxManagerClient
l1Syncr synchronizer.Synchronizer
halted atomic.Bool
state StateInterface
etherman Etherman
ethTxManager EthTxManagerClient
l1Syncr synchronizer.Synchronizer
halted atomic.Bool
accInputHashes map[uint64]common.Hash
accInputHashesMutex *sync.Mutex

profitabilityChecker aggregatorTxProfitabilityChecker
timeSendFinalProof time.Time
Expand Down Expand Up @@ -155,6 +158,8 @@ func New(
etherman: etherman,
ethTxManager: ethTxManager,
l1Syncr: l1Syncr,
accInputHashes: make(map[uint64]common.Hash),
accInputHashesMutex: &sync.Mutex{},
profitabilityChecker: profitabilityChecker,
stateDBMutex: &sync.Mutex{},
timeSendFinalProofMutex: &sync.RWMutex{},
Expand All @@ -170,7 +175,7 @@ func New(
a.ctx, a.exit = context.WithCancel(a.ctx)
}

// Set function to handle the batches from the data stream
// Set function to handle events on L1
if !cfg.SyncModeOnlyEnabled {
a.l1Syncr.SetCallbackOnReorgDone(a.handleReorg)
a.l1Syncr.SetCallbackOnRollbackBatches(a.handleRollbackBatches)
Expand All @@ -179,6 +184,26 @@ func New(
return a, nil
}

func (a *Aggregator) getAccInputHash(batchNumber uint64) common.Hash {
a.accInputHashesMutex.Lock()
defer a.accInputHashesMutex.Unlock()
return a.accInputHashes[batchNumber]
}

func (a *Aggregator) setAccInputHash(batchNumber uint64, accInputHash common.Hash) {
a.accInputHashesMutex.Lock()
defer a.accInputHashesMutex.Unlock()
a.accInputHashes[batchNumber] = accInputHash
}

func (a *Aggregator) removeAccInputHashes(firstBatch, lastBatch uint64) {
a.accInputHashesMutex.Lock()
defer a.accInputHashesMutex.Unlock()
for i := firstBatch; i <= lastBatch; i++ {
delete(a.accInputHashes, i)
}
}

func (a *Aggregator) handleReorg(reorgData synchronizer.ReorgExecutionResult) {
a.logger.Warnf("Reorg detected, reorgData: %+v", reorgData)

Expand Down Expand Up @@ -219,13 +244,16 @@ func (a *Aggregator) handleRollbackBatches(rollbackData synchronizer.RollbackBat
a.logger.Warnf("Rollback batches event, rollbackBatchesData: %+v", rollbackData)

var err error
var accInputHash *common.Hash

// Get new last verified batch number from L1
lastVerifiedBatchNumber, err := a.etherman.GetLatestVerifiedBatchNum()
if err != nil {
a.logger.Errorf("Error getting latest verified batch number: %v", err)
}

a.logger.Infof("Last Verified Batch Number:%v", lastVerifiedBatchNumber)

// Check lastVerifiedBatchNumber makes sense
if err == nil && lastVerifiedBatchNumber > rollbackData.LastBatchNumber {
err = fmt.Errorf(
Expand All @@ -234,6 +262,17 @@ func (a *Aggregator) handleRollbackBatches(rollbackData synchronizer.RollbackBat
)
}

if err == nil {
accInputHash, err = a.getVerifiedBatchAccInputHash(a.ctx, lastVerifiedBatchNumber)
if err == nil {
a.accInputHashesMutex.Lock()
a.accInputHashes = make(map[uint64]common.Hash)
a.accInputHashesMutex.Unlock()
a.logger.Infof("Starting AccInputHash:%v", accInputHash.String())
a.setAccInputHash(lastVerifiedBatchNumber, *accInputHash)
}
}

// Delete wip proofs
if err == nil {
err = a.state.DeleteUngeneratedProofs(a.ctx, nil)
Expand Down Expand Up @@ -272,7 +311,6 @@ func (a *Aggregator) Start() error {
err := a.l1Syncr.Sync(true)
if err != nil {
a.logger.Fatalf("Failed to synchronize from L1: %v", err)

return err
}

Expand All @@ -297,19 +335,27 @@ func (a *Aggregator) Start() error {
healthService := newHealthChecker()
grpchealth.RegisterHealthServer(a.srv, healthService)

// Delete ungenerated recursive proofs
err = a.state.DeleteUngeneratedProofs(a.ctx, nil)
if err != nil {
return fmt.Errorf("failed to initialize proofs cache %w", err)
}

// Get last verified batch number to set the starting point for verifications
lastVerifiedBatchNumber, err := a.etherman.GetLatestVerifiedBatchNum()
if err != nil {
return err
}

// Delete ungenerated recursive proofs
err = a.state.DeleteUngeneratedProofs(a.ctx, nil)
a.logger.Infof("Last Verified Batch Number:%v", lastVerifiedBatchNumber)

accInputHash, err := a.getVerifiedBatchAccInputHash(a.ctx, lastVerifiedBatchNumber)
if err != nil {
return fmt.Errorf("failed to initialize proofs cache %w", err)
return err
}

a.logger.Infof("Last Verified Batch Number:%v", lastVerifiedBatchNumber)
a.logger.Infof("Starting AccInputHash:%v", accInputHash.String())
a.setAccInputHash(lastVerifiedBatchNumber, *accInputHash)

a.resetVerifyProofTime()

Expand Down Expand Up @@ -1006,6 +1052,15 @@ func (a *Aggregator) tryAggregateProofs(ctx context.Context, prover ProverInterf
return true, nil
}

func (a *Aggregator) getVerifiedBatchAccInputHash(ctx context.Context, batchNumber uint64) (*common.Hash, error) {
accInputHash, err := a.etherman.GetBatchAccInputHash(ctx, batchNumber)
if err != nil {
return nil, err
}

return &accInputHash, nil
}

func (a *Aggregator) getAndLockBatchToProve(
ctx context.Context, prover ProverInterface,
) (*state.Batch, []byte, *state.Proof, error) {
Expand Down Expand Up @@ -1039,6 +1094,22 @@ func (a *Aggregator) getAndLockBatchToProve(

return nil, nil, nil, err
}

if proofExists {
accInputHash := a.getAccInputHash(batchNumberToVerify - 1)
if accInputHash == (common.Hash{}) && batchNumberToVerify > 1 {
tmpLogger.Warnf("AccInputHash for batch %d is not in memory, "+
"deleting proofs to regenerate acc input hash chain in memory", batchNumberToVerify)

err := a.state.CleanupGeneratedProofs(ctx, math.MaxInt, nil)
if err != nil {
tmpLogger.Infof("Error cleaning up generated proofs for batch %d", batchNumberToVerify)
return nil, nil, nil, err
}
batchNumberToVerify--
break
}
}
}

// Check if the batch has been sequenced
Expand Down Expand Up @@ -1092,15 +1163,37 @@ func (a *Aggregator) getAndLockBatchToProve(
virtualBatch.L1InfoRoot = &l1InfoRoot
}

// Calculate acc input hash as the RPC is not returning the correct one at the moment
accInputHash := cdkcommon.CalculateAccInputHash(
a.logger,
a.getAccInputHash(batchNumberToVerify-1),
virtualBatch.BatchL2Data,
*virtualBatch.L1InfoRoot,
uint64(sequence.Timestamp.Unix()),
rpcBatch.LastCoinbase(),
rpcBatch.ForcedBlockHashL1(),
)
// Store the acc input hash
a.setAccInputHash(batchNumberToVerify, accInputHash)

// Log params to calculate acc input hash
a.logger.Debugf("Calculated acc input hash for batch %d: %v", batchNumberToVerify, accInputHash)
a.logger.Debugf("L1InfoRoot: %v", virtualBatch.L1InfoRoot)
// a.logger.Debugf("LastL2BLockTimestamp: %v", rpcBatch.LastL2BLockTimestamp())
a.logger.Debugf("TimestampLimit: %v", uint64(sequence.Timestamp.Unix()))
a.logger.Debugf("LastCoinbase: %v", rpcBatch.LastCoinbase())
a.logger.Debugf("ForcedBlockHashL1: %v", rpcBatch.ForcedBlockHashL1())

// Create state batch
stateBatch := &state.Batch{
BatchNumber: rpcBatch.BatchNumber(),
Coinbase: rpcBatch.LastCoinbase(),
// Use L1 batch data
BatchL2Data: virtualBatch.BatchL2Data,
StateRoot: rpcBatch.StateRoot(),
LocalExitRoot: rpcBatch.LocalExitRoot(),
AccInputHash: rpcBatch.AccInputHash(),
BatchL2Data: virtualBatch.BatchL2Data,
StateRoot: rpcBatch.StateRoot(),
LocalExitRoot: rpcBatch.LocalExitRoot(),
// Use calculated acc input
AccInputHash: accInputHash,
L1InfoTreeIndex: rpcBatch.L1InfoTreeIndex(),
L1InfoRoot: *virtualBatch.L1InfoRoot,
Timestamp: time.Unix(int64(rpcBatch.LastL2BLockTimestamp()), 0),
Expand Down Expand Up @@ -1412,16 +1505,10 @@ func (a *Aggregator) buildInputProver(
}
}

// Get Old Acc Input Hash
rpcOldBatch, err := a.rpcClient.GetBatch(batchToVerify.BatchNumber - 1)
if err != nil {
return nil, err
}

inputProver := &prover.StatelessInputProver{
PublicInputs: &prover.StatelessPublicInputs{
Witness: witness,
OldAccInputHash: rpcOldBatch.AccInputHash().Bytes(),
OldAccInputHash: a.getAccInputHash(batchToVerify.BatchNumber - 1).Bytes(),
OldBatchNum: batchToVerify.BatchNumber - 1,
ChainId: batchToVerify.ChainID,
ForkId: batchToVerify.ForkID,
Expand Down Expand Up @@ -1521,6 +1608,10 @@ func (a *Aggregator) handleMonitoredTxResult(result ethtxtypes.MonitoredTxResult
}

mTxResultLogger.Debugf("deleted generated proofs from %d to %d", firstBatch, lastBatch)

// Remove the acc input hashes from the map
// leaving the last batch acc input hash as it will be used as old acc input hash
a.removeAccInputHashes(firstBatch, lastBatch-1)
}

func (a *Aggregator) cleanupLockedProofs() {
Expand Down
Loading

0 comments on commit 61fe7f6

Please sign in to comment.