Skip to content

Commit

Permalink
Merge pull request #2651 from OffchainLabs/delayed-seq-missing-messages
Browse files Browse the repository at this point in the history
Fix the delayed sequencer missing messages in tests
  • Loading branch information
tsahee authored Dec 17, 2024
2 parents e24b5e8 + 6eca766 commit 36ac667
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 18 deletions.
56 changes: 40 additions & 16 deletions arbnode/delayed_sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"math/big"
"sync"
"time"

flag "github.com/spf13/pflag"

Expand All @@ -30,16 +31,17 @@ type DelayedSequencer struct {
reader *InboxReader
exec execution.ExecutionSequencer
coordinator *SeqCoordinator
waitingForFinalizedBlock uint64
waitingForFinalizedBlock *uint64
mutex sync.Mutex
config DelayedSequencerConfigFetcher
}

type DelayedSequencerConfig struct {
Enable bool `koanf:"enable" reload:"hot"`
FinalizeDistance int64 `koanf:"finalize-distance" reload:"hot"`
RequireFullFinality bool `koanf:"require-full-finality" reload:"hot"`
UseMergeFinality bool `koanf:"use-merge-finality" reload:"hot"`
Enable bool `koanf:"enable" reload:"hot"`
FinalizeDistance int64 `koanf:"finalize-distance" reload:"hot"`
RequireFullFinality bool `koanf:"require-full-finality" reload:"hot"`
UseMergeFinality bool `koanf:"use-merge-finality" reload:"hot"`
RescanInterval time.Duration `koanf:"rescan-interval" reload:"hot"`
}

type DelayedSequencerConfigFetcher func() *DelayedSequencerConfig
Expand All @@ -49,20 +51,23 @@ func DelayedSequencerConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Int64(prefix+".finalize-distance", DefaultDelayedSequencerConfig.FinalizeDistance, "how many blocks in the past L1 block is considered final (ignored when using Merge finality)")
f.Bool(prefix+".require-full-finality", DefaultDelayedSequencerConfig.RequireFullFinality, "whether to wait for full finality before sequencing delayed messages")
f.Bool(prefix+".use-merge-finality", DefaultDelayedSequencerConfig.UseMergeFinality, "whether to use The Merge's notion of finality before sequencing delayed messages")
f.Duration(prefix+".rescan-interval", DefaultDelayedSequencerConfig.RescanInterval, "frequency to rescan for new delayed messages (the parent chain reader's poll-interval config is more important than this)")
}

var DefaultDelayedSequencerConfig = DelayedSequencerConfig{
Enable: false,
FinalizeDistance: 20,
RequireFullFinality: false,
UseMergeFinality: true,
RescanInterval: time.Second,
}

var TestDelayedSequencerConfig = DelayedSequencerConfig{
Enable: true,
FinalizeDistance: 20,
RequireFullFinality: false,
UseMergeFinality: false,
RescanInterval: time.Millisecond * 100,
}

func NewDelayedSequencer(l1Reader *headerreader.HeaderReader, reader *InboxReader, exec execution.ExecutionSequencer, coordinator *SeqCoordinator, config DelayedSequencerConfigFetcher) (*DelayedSequencer, error) {
Expand Down Expand Up @@ -126,13 +131,12 @@ func (d *DelayedSequencer) sequenceWithoutLockout(ctx context.Context, lastBlock
finalized = uint64(currentNum - config.FinalizeDistance)
}

if d.waitingForFinalizedBlock > finalized {
if d.waitingForFinalizedBlock != nil && *d.waitingForFinalizedBlock > finalized {
return nil
}

// Unless we find an unfinalized message (which sets waitingForBlock),
// we won't find a new finalized message until FinalizeDistance blocks in the future.
d.waitingForFinalizedBlock = lastBlockHeader.Number.Uint64() + 1
// Reset what block we're waiting for if we've caught up
d.waitingForFinalizedBlock = nil

dbDelayedCount, err := d.inbox.GetDelayedCount()
if err != nil {
Expand All @@ -153,8 +157,8 @@ func (d *DelayedSequencer) sequenceWithoutLockout(ctx context.Context, lastBlock
return err
}
if parentChainBlockNumber > finalized {
// Message isn't finalized yet; stop here
d.waitingForFinalizedBlock = parentChainBlockNumber
// Message isn't finalized yet; wait for it to be
d.waitingForFinalizedBlock = &parentChainBlockNumber
break
}
if lastDelayedAcc != (common.Hash{}) {
Expand Down Expand Up @@ -216,20 +220,40 @@ func (d *DelayedSequencer) run(ctx context.Context) {
headerChan, cancel := d.l1Reader.Subscribe(false)
defer cancel()

latestHeader, err := d.l1Reader.LastHeader(ctx)
if err != nil {
log.Warn("delayed sequencer: failed to get latest header", "err", err)
latestHeader = nil
}
rescanTimer := time.NewTimer(d.config().RescanInterval)
for {
if !rescanTimer.Stop() {
select {
case <-rescanTimer.C:
default:
}
}
if latestHeader != nil {
rescanTimer.Reset(d.config().RescanInterval)
}
var ok bool
select {
case nextHeader, ok := <-headerChan:
case latestHeader, ok = <-headerChan:
if !ok {
log.Info("delayed sequencer: header channel close")
log.Debug("delayed sequencer: header channel close")
return
}
if err := d.trySequence(ctx, nextHeader); err != nil {
log.Error("Delayed sequencer error", "err", err)
case <-rescanTimer.C:
if latestHeader == nil {
continue
}
case <-ctx.Done():
log.Info("delayed sequencer: context done", "err", ctx.Err())
log.Debug("delayed sequencer: context done", "err", ctx.Err())
return
}
if err := d.trySequence(ctx, latestHeader); err != nil {
log.Error("Delayed sequencer error", "err", err)
}
}
}

Expand Down
2 changes: 0 additions & 2 deletions system_tests/block_validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,6 @@ func testBlockValidatorSimple(t *testing.T, opts Options) {
builder.L1.SendWaitTestTransactions(t, []*types.Transaction{
WrapL2ForDelayed(t, delayedTx, builder.L1Info, "User", 100000),
})
// give the inbox reader a bit of time to pick up the delayed message
time.Sleep(time.Millisecond * 500)

// sending l1 messages creates l1 blocks.. make enough to get that delayed inbox message in
for i := 0; i < 30; i++ {
Expand Down

0 comments on commit 36ac667

Please sign in to comment.