diff --git a/tx-submitter/services/rollup.go b/tx-submitter/services/rollup.go index c7180d3a..96e9c78a 100644 --- a/tx-submitter/services/rollup.go +++ b/tx-submitter/services/rollup.go @@ -225,6 +225,8 @@ func (r *Rollup) Start() error { func (r *Rollup) ProcessTx() error { + // case 0: batch has committed + // -> remove from local pool // case 1: in mempool // -> check timeout // case 2: no in mempool @@ -233,6 +235,15 @@ func (r *Rollup) ProcessTx() error { // case 2.3: tx included -> failed // -> reset index to failed index + // if this submitter work + cur, err := r.rotator.CurrentSubmitter(r.L2Clients, r.Staking) + if err != nil { + return fmt.Errorf("rollup: get current submitter err, %w", err) + } + if cur.Hex() != r.WalletAddr().Hex() { + log.Info("wait my turn to process tx") + return nil + } // get all local txs txRecords := r.pendingTxs.GetAll() if len(txRecords) == 0 { @@ -241,9 +252,27 @@ func (r *Rollup) ProcessTx() error { // query tx status for _, txRecord := range txRecords { - + // parse tx rtx := txRecord.tx method := utils.ParseMethod(rtx, r.abi) + if method == "commitBatch" { + // get latest rolluped batch index + cindexBig, err := r.Rollup.LastCommittedBatchIndex(nil) + if err != nil { + return fmt.Errorf("get last committed batch index error:%v", err) + } + batchIndex := utils.ParseParentBatchIndex(rtx.Data()) + if batchIndex <= cindexBig.Uint64() { + log.Info("batch has committed remove batch tx from local pool", + "cur_batch_index", batchIndex, + "latest_committed_batch_index", cindexBig, + ) + r.pendingTxs.Remove(rtx.Hash()) + continue + } + + } + log.Info("process tx", "hash", rtx.Hash().String(), "nonce", rtx.Nonce(), "method", method) // query tx _, ispending, err := r.L1Client.TransactionByHash(context.Background(), txRecord.tx.Hash())