Skip to content

Commit

Permalink
Timeboost: swap sequencers seamlessly
Browse files Browse the repository at this point in the history
  • Loading branch information
ganeshvanahalli committed Jan 16, 2025
1 parent 55044a8 commit 88cb846
Show file tree
Hide file tree
Showing 7 changed files with 418 additions and 3 deletions.
72 changes: 72 additions & 0 deletions execution/gethexec/express_lane_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type expressLaneService struct {
earlySubmissionGrace time.Duration
chainConfig *params.ChainConfig
auctionContract *express_lane_auctiongen.ExpressLaneAuction
redisCoordinator *timeboost.RedisCoordinator
roundControl containers.SyncMap[uint64, common.Address] // thread safe

roundInfoMutex sync.Mutex
Expand Down Expand Up @@ -101,6 +102,11 @@ pending:
return nil, err
}

redisCoordinator, err := timeboost.NewRedisCoordinator(seqConfig().Timeboost.RedisUrl, roundTimingInfo.Round)
if err != nil {
return nil, fmt.Errorf("error initializing expressLaneService redis: %w", err)
}

return &expressLaneService{
transactionPublisher: transactionPublisher,
seqConfig: seqConfig,
Expand All @@ -110,6 +116,7 @@ pending:
roundTimingInfo: *roundTimingInfo,
earlySubmissionGrace: earlySubmissionGrace,
auctionContractAddr: auctionContractAddr,
redisCoordinator: redisCoordinator,
roundInfo: containers.NewLruCache[uint64, *expressLaneRoundInfo](8),
}, nil
}
Expand Down Expand Up @@ -371,10 +378,16 @@ func (es *expressLaneService) sequenceExpressLaneSubmission(
roundInfo.sequence += 1
}

seqCount := roundInfo.sequence
es.roundInfo.Add(msg.Round, roundInfo)
unlockByDefer = false
es.roundInfoMutex.Unlock() // Release lock so that other timeboost txs can be processed

// Persist accepted expressLane txs to redis
if err = es.redisCoordinator.AddAcceptedTx(ctx, msg); err != nil {
log.Error("Error adding accepted ExpressLaneSubmission to redis. Loss of msg possible if sequencer switch happens", "seqNum", msg.SequenceNumber, "txHash", msg.Transaction.Hash(), "err", err)
}

abortCtx, cancel := ctxWithTimeout(ctx, queueTimeout*2) // We use the same timeout value that sequencer imposes
defer cancel()
select {
Expand All @@ -385,6 +398,14 @@ func (es *expressLaneService) sequenceExpressLaneSubmission(
}
err = fmt.Errorf("Transaction sequencing hit timeout, result for the submitted transaction is not yet available: %w", abortCtx.Err())
}

// We update the sequence count in redis only after receiving a result for sequencing this message, instead of updating while holding roundInfoMutex,
// because this prevents any loss of transactions when the prev chosen sequencer updates the count but some how fails to forward txs to the current chosen.
// If the prev chosen ends up forwarding the tx, it is ok as the duplicate txs will be discarded
if redisErr := es.redisCoordinator.UpdateSequenceCount(context.Background(), msg.Round, seqCount); redisErr != nil {
log.Error("Error updating round's sequence count in redis", "err", redisErr) // this shouldn't be a problem if future msgs succeed in updating the count
}

if err != nil {
// If the tx fails we return an error with all the necessary info for the controller
return fmt.Errorf("%w: Sequence number: %d (consumed), Transaction hash: %v, Error: %w", timeboost.ErrAcceptedTxFailed, msg.SequenceNumber, msg.Transaction.Hash(), err)
Expand Down Expand Up @@ -430,3 +451,54 @@ func (es *expressLaneService) validateExpressLaneTx(msg *timeboost.ExpressLaneSu
}
return nil
}

func (es *expressLaneService) syncFromRedis() {
es.roundInfoMutex.Lock()
defer es.roundInfoMutex.Unlock()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

currentRound := es.roundTimingInfo.RoundNumber()

// If expressLaneRoundInfo for current round doesn't exist yet, we'll add it to the cache
if !es.roundInfo.Contains(currentRound) {
es.roundInfo.Add(currentRound, &expressLaneRoundInfo{
0,
make(map[uint64]*msgAndResult),
})
}
roundInfo, _ := es.roundInfo.Get(currentRound)

redisSeqCount, err := es.redisCoordinator.GetSequenceCount(ctx, currentRound)
if err != nil {
log.Error("error fetching current round's global sequence count from redis", "err", err)
} else if redisSeqCount > roundInfo.sequence {
roundInfo.sequence = redisSeqCount
}

var msgReadyForSequencing *timeboost.ExpressLaneSubmission
pendingMsgs := es.redisCoordinator.GetAcceptedTxs(ctx, currentRound, roundInfo.sequence)
for _, msg := range pendingMsgs {
// If we get a msg that can be readily sequenced, don't add it to the map
// instead sequence it right after we finish updating the map with rest of the msgs
if msg.SequenceNumber == roundInfo.sequence {
msgReadyForSequencing = msg
} else {
roundInfo.msgAndResultBySequenceNumber[msg.SequenceNumber] = &msgAndResult{
msg: msg,
resultChan: make(chan error, 1), // will never be read from, but required for sequencing of this msg
}
}
}

es.roundInfo.Add(currentRound, roundInfo)

if msgReadyForSequencing != nil {
es.LaunchUntrackedThread(func() {
if err := es.sequenceExpressLaneSubmission(context.Background(), msgReadyForSequencing); err != nil {
log.Error("Untracked expressLaneSubmission returned an error", "round", msgReadyForSequencing.Round, "seqNum", msgReadyForSequencing.SequenceNumber, "txHash", msgReadyForSequencing.Transaction.Hash(), "err", err)
}
})
}
}
110 changes: 109 additions & 1 deletion execution/gethexec/express_lane_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/offchainlabs/nitro/timeboost"
"github.com/offchainlabs/nitro/util/containers"
"github.com/offchainlabs/nitro/util/redisutil"
)

var testPriv, testPriv2 *ecdsa.PrivateKey
Expand Down Expand Up @@ -306,11 +307,15 @@ func Test_expressLaneService_sequenceExpressLaneSubmission_nonceTooLow(t *testin
func Test_expressLaneService_sequenceExpressLaneSubmission_duplicateNonce(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
redisUrl := redisutil.CreateTestRedis(ctx, t)
els := &expressLaneService{
roundInfo: containers.NewLruCache[uint64, *expressLaneRoundInfo](8),
roundTimingInfo: defaultTestRoundTimingInfo(time.Now()),
seqConfig: func() *SequencerConfig { return &SequencerConfig{} },
}
var err error
els.redisCoordinator, err = timeboost.NewRedisCoordinator(redisUrl, els.roundTimingInfo.Round)
require.NoError(t, err)
els.roundInfo.Add(0, &expressLaneRoundInfo{1, make(map[uint64]*msgAndResult)})
els.StopWaiter.Start(ctx, els)
els.roundControl.Store(0, crypto.PubkeyToAddress(testPriv.PublicKey))
Expand Down Expand Up @@ -346,11 +351,15 @@ func Test_expressLaneService_sequenceExpressLaneSubmission_duplicateNonce(t *tes
func Test_expressLaneService_sequenceExpressLaneSubmission_outOfOrder(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
redisUrl := redisutil.CreateTestRedis(ctx, t)
els := &expressLaneService{
roundInfo: containers.NewLruCache[uint64, *expressLaneRoundInfo](8),
roundTimingInfo: defaultTestRoundTimingInfo(time.Now()),
seqConfig: func() *SequencerConfig { return &SequencerConfig{} },
}
var err error
els.redisCoordinator, err = timeboost.NewRedisCoordinator(redisUrl, els.roundTimingInfo.Round)
require.NoError(t, err)
els.roundInfo.Add(0, &expressLaneRoundInfo{1, make(map[uint64]*msgAndResult)})
els.StopWaiter.Start(ctx, els)
els.roundControl.Store(0, crypto.PubkeyToAddress(testPriv.PublicKey))
Expand Down Expand Up @@ -389,7 +398,7 @@ func Test_expressLaneService_sequenceExpressLaneSubmission_outOfOrder(t *testing
els.roundInfoMutex.Unlock()

wg.Add(2) // 4 & 5 should be able to get in after 3 so we add a delta of 2
err := els.sequenceExpressLaneSubmission(ctx, buildValidSubmissionWithSeqAndTx(t, 0, 3, emptyTx))
err = els.sequenceExpressLaneSubmission(ctx, buildValidSubmissionWithSeqAndTx(t, 0, 3, emptyTx))
require.NoError(t, err)
wg.Wait()
require.Equal(t, 5, len(stubPublisher.publishedTxOrder))
Expand All @@ -403,11 +412,15 @@ func Test_expressLaneService_sequenceExpressLaneSubmission_outOfOrder(t *testing
func Test_expressLaneService_sequenceExpressLaneSubmission_erroredTx(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
redisUrl := redisutil.CreateTestRedis(ctx, t)
els := &expressLaneService{
roundInfo: containers.NewLruCache[uint64, *expressLaneRoundInfo](8),
roundTimingInfo: defaultTestRoundTimingInfo(time.Now()),
seqConfig: func() *SequencerConfig { return &SequencerConfig{} },
}
var err error
els.redisCoordinator, err = timeboost.NewRedisCoordinator(redisUrl, els.roundTimingInfo.Round)
require.NoError(t, err)
els.roundInfo.Add(0, &expressLaneRoundInfo{1, make(map[uint64]*msgAndResult)})
els.StopWaiter.Start(ctx, els)
els.roundControl.Store(0, crypto.PubkeyToAddress(testPriv.PublicKey))
Expand Down Expand Up @@ -435,6 +448,101 @@ func Test_expressLaneService_sequenceExpressLaneSubmission_erroredTx(t *testing.
require.Equal(t, 3, len(stubPublisher.publishedTxOrder))
}

func Test_expressLaneService_syncFromRedis(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
redisUrl := redisutil.CreateTestRedis(ctx, t)
els1 := &expressLaneService{
roundInfo: containers.NewLruCache[uint64, *expressLaneRoundInfo](8),
roundTimingInfo: defaultTestRoundTimingInfo(time.Now()),
seqConfig: func() *SequencerConfig { return &SequencerConfig{} },
}
var err error
els1.redisCoordinator, err = timeboost.NewRedisCoordinator(redisUrl, els1.roundTimingInfo.Round)
require.NoError(t, err)

els1.roundInfo.Add(0, &expressLaneRoundInfo{1, make(map[uint64]*msgAndResult)})
els1.StopWaiter.Start(ctx, els1)
els1.roundControl.Store(0, crypto.PubkeyToAddress(testPriv.PublicKey))
stubPublisher1 := makeStubPublisher(els1)
els1.transactionPublisher = stubPublisher1

messages := []*timeboost.ExpressLaneSubmission{
buildValidSubmissionWithSeqAndTx(t, 0, 1, emptyTx),
buildValidSubmissionWithSeqAndTx(t, 0, 3, emptyTx),
buildValidSubmissionWithSeqAndTx(t, 0, 4, emptyTx),
buildValidSubmissionWithSeqAndTx(t, 0, 5, emptyTx),
}

// We launch 4 goroutines out of which 1 would return with a result hence we add a delta of 5
var wg sync.WaitGroup
wg.Add(5)
for _, msg := range messages {
go func(w *sync.WaitGroup) {
w.Done()
_ = els1.sequenceExpressLaneSubmission(ctx, msg)
if msg.SequenceNumber == 1 {
w.Done()
}
}(&wg)
}
wg.Wait()

// Only one tx out of the three should have been processed
require.Equal(t, 1, len(stubPublisher1.publishedTxOrder))

els2 := &expressLaneService{
roundInfo: containers.NewLruCache[uint64, *expressLaneRoundInfo](8),
roundTimingInfo: defaultTestRoundTimingInfo(time.Now()),
seqConfig: func() *SequencerConfig { return &SequencerConfig{} },
}
els2.redisCoordinator, err = timeboost.NewRedisCoordinator(redisUrl, els2.roundTimingInfo.Round)
require.NoError(t, err)

els2.StopWaiter.Start(ctx, els1)
els2.roundControl.Store(0, crypto.PubkeyToAddress(testPriv.PublicKey))
stubPublisher2 := makeStubPublisher(els2)
els2.transactionPublisher = stubPublisher2

// As els2 becomes an active sequencer, syncFromRedis would be called when Activate() function of sequencer is invoked
els2.syncFromRedis()

els2.roundInfoMutex.Lock()
roundInfo, exists := els2.roundInfo.Get(0)
if !exists {
t.Fatal("missing roundInfo")
}
if roundInfo.sequence != 2 {
t.Fatalf("round sequence count mismatch. Want: 2, Got: %d", roundInfo.sequence)
}
if len(roundInfo.msgAndResultBySequenceNumber) != 3 { // There should be three pending txs in msgAndResult map
t.Fatalf("number of future sequence txs mismatch. Want: 3, Got: %d", len(roundInfo.msgAndResultBySequenceNumber))
}
els2.roundInfoMutex.Unlock()

err = els2.sequenceExpressLaneSubmission(ctx, buildValidSubmissionWithSeqAndTx(t, 0, 2, emptyTx)) // Send an unblocking tx
require.NoError(t, err)

time.Sleep(time.Second) // wait for future seq num txs to be processed

// Check that all pending txs are sequenced
require.Equal(t, 4, len(stubPublisher2.publishedTxOrder))

// Check final state of roundInfo
els2.roundInfoMutex.Lock()
roundInfo, exists = els2.roundInfo.Get(0)
if !exists {
t.Fatal("missing roundInfo")
}
if roundInfo.sequence != 6 {
t.Fatalf("round sequence count mismatch. Want: 6, Got: %d", roundInfo.sequence)
}
if len(roundInfo.msgAndResultBySequenceNumber) != 0 { // There should be three pending txs in msgAndResult map
t.Fatalf("MsgAndResult map should be empty. Got: %d", len(roundInfo.msgAndResultBySequenceNumber))
}
els2.roundInfoMutex.Unlock()
}

func TestIsWithinAuctionCloseWindow(t *testing.T) {
initialTimestamp := time.Date(2024, 8, 8, 15, 0, 0, 0, time.UTC)
roundTimingInfo := defaultTestRoundTimingInfo(initialTimestamp)
Expand Down
6 changes: 6 additions & 0 deletions execution/gethexec/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type TimeboostConfig struct {
SequencerHTTPEndpoint string `koanf:"sequencer-http-endpoint"`
EarlySubmissionGrace time.Duration `koanf:"early-submission-grace"`
MaxQueuedTxCount int `koanf:"max-queued-tx-count"`
RedisUrl string `koanf:"redis-url"`
}

var DefaultTimeboostConfig = TimeboostConfig{
Expand All @@ -102,6 +103,7 @@ var DefaultTimeboostConfig = TimeboostConfig{
SequencerHTTPEndpoint: "http://localhost:8547",
EarlySubmissionGrace: time.Second * 2,
MaxQueuedTxCount: 10,
RedisUrl: "",
}

func (c *SequencerConfig) Validate() error {
Expand Down Expand Up @@ -197,6 +199,7 @@ func TimeboostAddOptions(prefix string, f *flag.FlagSet) {
f.String(prefix+".sequencer-http-endpoint", DefaultTimeboostConfig.SequencerHTTPEndpoint, "this sequencer's http endpoint")
f.Duration(prefix+".early-submission-grace", DefaultTimeboostConfig.EarlySubmissionGrace, "period of time before the next round where submissions for the next round will be queued")
f.Int(prefix+".max-queued-tx-count", DefaultTimeboostConfig.MaxQueuedTxCount, "maximum allowed number of express lane txs with future sequence number to be queued. Set 0 to disable this check and a negative value to prevent queuing of any future sequence number transactions")
f.String(prefix+".redis-url", DefaultTimeboostConfig.RedisUrl, "the Redis URL for expressLaneService to coordinate via")
}

type txQueueItem struct {
Expand Down Expand Up @@ -754,6 +757,9 @@ func (s *Sequencer) Activate() {
close(s.pauseChan)
s.pauseChan = nil
}
if s.expressLaneService != nil {
s.expressLaneService.syncFromRedis() // We want sync to complete (which is best effort) before activating the sequencer
}
}

func (s *Sequencer) Pause() {
Expand Down
2 changes: 2 additions & 0 deletions system_tests/timeboost_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1135,9 +1135,11 @@ func setupExpressLaneAuction(
builderSeq.l2StackConfig.JWTSecret = jwtSecretPath
builderSeq.nodeConfig.Feed.Output = *newBroadcasterConfigTest()
builderSeq.execConfig.Sequencer.Enable = true
expressLaneRedisURL := redisutil.CreateTestRedis(ctx, t)
builderSeq.execConfig.Sequencer.Timeboost = gethexec.TimeboostConfig{
Enable: false, // We need to start without timeboost initially to create the auction contract
ExpressLaneAdvantage: time.Second * 5,
RedisUrl: expressLaneRedisURL,
}
builderSeq.nodeConfig.TransactionStreamer.TrackBlockMetadataFrom = 1
cleanupSeq := builderSeq.Build(t)
Expand Down
Loading

0 comments on commit 88cb846

Please sign in to comment.