Skip to content

Commit

Permalink
chore: sequence sender improvements (#117)
Browse files Browse the repository at this point in the history
* chore: reduce map indexing in syncEthTxResults and propagate an error

* chore: handle the context.Done and rely on ticker instead of time.Sleep in sequenceSending fn

* chore: introduce IsStopped func

* fix: failing unit tests

* chore: remove the mutex from updateLatestVirtualBatch

* chore: simplify the for loop when iterating through batches

* chore: simplify the marginTimeElapsed func

* chore: simplify waiting for time margin to elapse

* chore: simplify purge logic

* fix: wait for margin for latest l1 block

---------

Co-authored-by: Goran Rojovic <[email protected]>
  • Loading branch information
Stefan-Ethernal and goran-ethernal authored Oct 16, 2024
1 parent 8e2015f commit 549b102
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 137 deletions.
26 changes: 17 additions & 9 deletions sequencesender/ethtx.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (s *SequenceSender) sendTx(ctx context.Context, resend bool, txOldHash *com
// purgeEthTx purges transactions from memory structures
func (s *SequenceSender) purgeEthTx(ctx context.Context) {
// If sequence sending is stopped, do not purge
if atomic.LoadUint32(&s.seqSendingStopped) == 1 {
if s.IsStopped() {
return
}

Expand Down Expand Up @@ -162,28 +162,30 @@ func (s *SequenceSender) purgeEthTx(ctx context.Context) {
}

// syncEthTxResults syncs results from L1 for transactions in the memory structure
func (s *SequenceSender) syncEthTxResults(ctx context.Context) (uint64, error) { //nolint:unparam
func (s *SequenceSender) syncEthTxResults(ctx context.Context) (uint64, error) {
s.mutexEthTx.Lock()
var (
txPending uint64
txSync uint64
)
for hash, data := range s.ethTransactions {
if data.Status == types.MonitoredTxStatusFinalized.String() {
for hash, tx := range s.ethTransactions {
if tx.Status == types.MonitoredTxStatusFinalized.String() {
continue
}

err := s.getResultAndUpdateEthTx(ctx, hash)
if err != nil {
log.Errorf("error getting result for tx %v: %v", hash, err)
return 0, err
}

txSync++
txStatus := s.ethTransactions[hash].Status
txStatus := types.MonitoredTxStatus(tx.Status)
// Count if it is not in a final state
if s.ethTransactions[hash].OnMonitor &&
txStatus != types.MonitoredTxStatusFailed.String() &&
txStatus != types.MonitoredTxStatusSafe.String() &&
txStatus != types.MonitoredTxStatusFinalized.String() {
if tx.OnMonitor &&
txStatus != types.MonitoredTxStatusFailed &&
txStatus != types.MonitoredTxStatusSafe &&
txStatus != types.MonitoredTxStatusFinalized {
txPending++
}
}
Expand All @@ -193,6 +195,7 @@ func (s *SequenceSender) syncEthTxResults(ctx context.Context) (uint64, error) {
err := s.saveSentSequencesTransactions(ctx)
if err != nil {
log.Errorf("error saving tx sequence, error: %v", err)
return 0, err
}

log.Infof("%d tx results synchronized (%d in pending state)", txSync, txPending)
Expand Down Expand Up @@ -388,3 +391,8 @@ func (s *SequenceSender) saveSentSequencesTransactions(ctx context.Context) erro

return nil
}

// IsStopped returns true in case seqSendingStopped is set to 1, otherwise false
func (s *SequenceSender) IsStopped() bool {
return atomic.LoadUint32(&s.seqSendingStopped) == 1
}
Loading

0 comments on commit 549b102

Please sign in to comment.