Skip to content

Commit

Permalink
Cherry picks from celestia (#450)
Browse files Browse the repository at this point in the history
* Fast Confirmations Issue (#437)

* Enqueue transactions to espresso as soon as they arrive

* Multiple submitted transactions

* Add the enqueue to AddBroadcastMessages

* add messages in write messages so that we can include all messages

* add debug log

---------

Co-authored-by: Jeremy <[email protected]>

* Resubmit hotshot transactions (#441)

* Enqueue transactions to espresso as soon as they arrive

* Multiple submitted transactions

(cherry picked from commit 53ab629)

* Fix

* Resubmit espresso txns

* add log for better debugging

* add tests and fix some logical errors

* fix comments

* fix comments

* address review comments

* go back to old test design to make it explicit

---------

Co-authored-by: Sneh Koul <[email protected]>

* Fix: Batch posting delayed because of the frequency of messages (#440)

* Store last position in batch

* fix merge conflict

* fix build

* re-run ci

* Fix broken CI

* Modify batch poster to increase upper bound of batch if we only have useless messages in the batch

* Address PR feedback and run make fmt

* Update default test to have larger max size for its batches and poll for finality every 2 seconds.

* Update batch poster flow to check validation after we accumulate the batch.

* Remove log that could erroneously suggest espresso validation had been performed on a batch

* Move log to less spammy location

* Update outdated comment

* Respond to review feedback

---------

Co-authored-by: Zach Showalter <[email protected]>

* Log batch posting reason when posting due to max delay. (#449)

* Add better logging to batch poster surrounding posting due to max delay

* Add start message

* Poll to resubmit transactions (#452)

* Poll to resubmit transactions

* fix comment

* fix comment

* Update write logic to add logs and make sure that the message is alwa… (#453)

* Update write logic to add logs and make sure that the message is always enqueued to Espresso

* remove HasNotSubmitted

* remove HasNotSubmitted

* remove HasNotSubmitted

* Lint

---------

Co-authored-by: Sneh Koul <[email protected]>
Co-authored-by: Sneh Koul <[email protected]>
Co-authored-by: Zach Showalter <[email protected]>
  • Loading branch information
4 people authored Jan 22, 2025
1 parent 17734f4 commit 4f797fa
Show file tree
Hide file tree
Showing 8 changed files with 362 additions and 279 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ jobs:
scripts/espresso-run-test-ci -tags=stylustest -run=TestProgramArbitrator
- name: Archive detailed run log
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: ${{ matrix.test-mode }}-full.log
path: full.log
Expand Down
125 changes: 58 additions & 67 deletions arbnode/batch_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ type BatchPosterConfig struct {
HotShotUrl string `koanf:"hotshot-url"`
UseEscapeHatch bool `koanf:"use-escape-hatch"`
EspressoTxnsPollingInterval time.Duration `koanf:"espresso-txns-polling-interval"`
ResubmitEspressoTxDeadline time.Duration `koanf:"resubmit-espresso-tx-deadline"`
// MaxBlockLagBeforeEscapeHatch specifies the maximum number of L1 blocks that HotShot
// state updates can lag behind before triggering the escape hatch. If the difference
// between the current L1 block number and the latest state update's block number
Expand Down Expand Up @@ -254,6 +255,7 @@ func BatchPosterConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Bool(prefix+".check-batch-correctness", DefaultBatchPosterConfig.CheckBatchCorrectness, "setting this to true will run the batch against an inbox multiplexer and verifies that it produces the correct set of messages")
f.Bool(prefix+".use-escape-hatch", DefaultBatchPosterConfig.UseEscapeHatch, "if true, Escape Hatch functionality will be used")
f.Duration(prefix+".espresso-txns-polling-interval", DefaultBatchPosterConfig.EspressoTxnsPollingInterval, "interval between polling for transactions to be included in the block")
f.Duration(prefix+".resubmit-espresso-tx-deadline", DefaultBatchPosterConfig.ResubmitEspressoTxDeadline, "time threshold after which a transaction will be automatically resubmitted if no response is received")
f.Uint64(prefix+".max-block-lag-before-escape-hatch", DefaultBatchPosterConfig.MaxBlockLagBeforeEscapeHatch, "specifies the switch delay threshold used to determine hotshot liveness")
f.Duration(prefix+".max-empty-batch-delay", DefaultBatchPosterConfig.MaxEmptyBatchDelay, "maximum empty batch posting delay, batch poster will only be able to post an empty batch if this time period building a batch has passed")
redislock.AddConfigOptions(prefix+".redis-lock", f)
Expand Down Expand Up @@ -289,7 +291,8 @@ var DefaultBatchPosterConfig = BatchPosterConfig{
ReorgResistanceMargin: 10 * time.Minute,
CheckBatchCorrectness: true,
UseEscapeHatch: false,
EspressoTxnsPollingInterval: time.Millisecond * 500,
EspressoTxnsPollingInterval: time.Second,
ResubmitEspressoTxDeadline: 10 * time.Minute,
MaxBlockLagBeforeEscapeHatch: 350,
LightClientAddress: "",
HotShotUrl: "",
Expand Down Expand Up @@ -326,10 +329,11 @@ var TestBatchPosterConfig = BatchPosterConfig{
GasEstimateBaseFeeMultipleBips: arbmath.OneInUBips * 3 / 2,
CheckBatchCorrectness: true,
UseEscapeHatch: false,
EspressoTxnsPollingInterval: time.Millisecond * 500,
EspressoTxnsPollingInterval: time.Second,
MaxBlockLagBeforeEscapeHatch: 10,
LightClientAddress: "",
HotShotUrl: "",
ResubmitEspressoTxDeadline: 10 * time.Second,
}

type BatchPosterOpts struct {
Expand Down Expand Up @@ -392,6 +396,7 @@ func NewBatchPoster(ctx context.Context, opts *BatchPosterOpts) (*BatchPoster, e
opts.Streamer.espressoTxnsPollingInterval = opts.Config().EspressoTxnsPollingInterval
opts.Streamer.maxBlockLagBeforeEscapeHatch = opts.Config().MaxBlockLagBeforeEscapeHatch
opts.Streamer.espressoMaxTransactionSize = espressoTransactionSizeLimit
opts.Streamer.resubmitEspressoTxDeadline = opts.Config().ResubmitEspressoTxDeadline
}

b := &BatchPoster{
Expand Down Expand Up @@ -566,49 +571,29 @@ var EspressoFetchTransactionErr = errors.New("failed to fetch the espresso trans

// Adds a block merkle proof to an Espresso justification, providing a proof that a set of transactions
// hashes to some light client state root.
func (b *BatchPoster) checkEspressoValidation() error {
func (b *BatchPoster) checkEspressoValidation() bool {
b.building.segments.SetWaitingForValidation()
if b.streamer.espressoClient == nil && b.streamer.lightClientReader == nil {
// We are not using espresso mode since these haven't been set
return nil
// We are not using espresso mode since these haven't been set, return true to advance batch posting
return true
}
if b.streamer.EscapeHatchEnabled {
log.Warn("skipped espresso verification due to hotshot failure", "pos", b.building.msgCount)
return true // return true to skip verification of batch
}

lastConfirmed, err := b.streamer.getLastConfirmedPos()
if err != nil {
log.Error("failed call to get last confirmed pos", "err", err)
return err
return false // if we get an error we can't validate
}

// This message has passed the espresso verification
if lastConfirmed != nil && b.building.msgCount <= *lastConfirmed {
return nil
}

if b.streamer.EscapeHatchEnabled {
log.Warn("skipped espresso verification due to hotshot failure", "pos", b.building.msgCount)
return nil
}

return fmt.Errorf("%w (height: %d)", EspressoValidationErr, b.building.msgCount)
}

func (b *BatchPoster) enqueuePendingTransaction(pos arbutil.MessageIndex) error {
hasNotSubmitted, err := b.streamer.HasNotSubmitted(pos)
if err != nil {
return err
}
if !hasNotSubmitted {
return nil
}

// Store the pos in the database to be used later to submit the message
// to hotshot for finalization.
err = b.streamer.SubmitEspressoTransactionPos(pos)
if err != nil {
log.Error("failed to submit espresso transaction pos", "pos", pos, "err", err)
return err
if lastConfirmed != nil && b.building.msgCount-1 <= *lastConfirmed {
return true
}

return nil
// If we aren't skipping validation for this batch, or we can't validate the proofs, we need to retry.
return false
}

type txInfo struct {
Expand Down Expand Up @@ -810,19 +795,20 @@ func (b *BatchPoster) getBatchPosterPosition(ctx context.Context, blockNum *big.
var errBatchAlreadyClosed = errors.New("batch segments already closed")

type batchSegments struct {
compressedBuffer *bytes.Buffer
compressedWriter *brotli.Writer
rawSegments [][]byte
timestamp uint64
blockNum uint64
delayedMsg uint64
sizeLimit int
recompressionLevel int
newUncompressedSize int
totalUncompressedSize int
lastCompressedSize int
trailingHeaders int // how many trailing segments are headers
isDone bool
compressedBuffer *bytes.Buffer
compressedWriter *brotli.Writer
rawSegments [][]byte
timestamp uint64
blockNum uint64
delayedMsg uint64
sizeLimit int
recompressionLevel int
newUncompressedSize int
totalUncompressedSize int
lastCompressedSize int
trailingHeaders int // how many trailing segments are headers
isDone bool
isWaitingForEspressoValidation bool // We are waiting for the entirety of the batch to be validated by espresso. Should be false by default
}

type buildingBatch struct {
Expand Down Expand Up @@ -1017,6 +1003,12 @@ func (s *batchSegments) addDelayedMessage() (bool, error) {
}

func (s *batchSegments) AddMessage(msg *arbostypes.MessageWithMetadata) (bool, error) {

if s.isWaitingForEspressoValidation {
log.Info("Current batch is waiting for espresso validation, we won't add more messages")
// if we are waiting for espresso validation return that the batch is full with no error
return false, nil
}
if s.isDone {
return false, errBatchAlreadyClosed
}
Expand Down Expand Up @@ -1063,6 +1055,13 @@ func (s *batchSegments) CloseAndGetBytes() ([]byte, error) {
return fullMsg, nil
}

// Make the batch wait for validation Add this so we don't need to export the structs state to set it as we shouldn't need to set it to false again.
func (s *batchSegments) SetWaitingForValidation() {
if !s.isWaitingForEspressoValidation {
log.Info("Set current batch segments to waiting for validation")
s.isWaitingForEspressoValidation = true
}
}
func (b *BatchPoster) encodeAddBatch(
seqNum *big.Int,
prevMsgNum arbutil.MessageIndex,
Expand Down Expand Up @@ -1262,6 +1261,7 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)

dbBatchCount, err := b.inbox.GetBatchCount()
if err != nil {
log.Error("Error getting batch count", "err", err)
return false, err
}
if dbBatchCount > batchPosition.NextSeqNum {
Expand Down Expand Up @@ -1318,15 +1318,16 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
}
msgCount, err := b.streamer.GetMessageCount()
if err != nil {
log.Error("Error getting message count", "err", err)
return false, err
}
if msgCount <= batchPosition.MessageCount {
// There's nothing after the newest batch, therefore batch posting was not required
return false, nil
}

lastPotentialMsg, err := b.streamer.GetMessage(msgCount - 1)
if err != nil {

return false, err
}

Expand Down Expand Up @@ -1401,24 +1402,12 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
}
}

// Submit message positions to pending queue
shouldSubmit := b.streamer.shouldSubmitEspressoTransaction()
if shouldSubmit {
for p := b.building.msgCount; p < msgCount; p += 1 {
err = b.enqueuePendingTransaction(p)
if err != nil {
log.Error("error submitting position", "error", err, "pos", p)
break
}
}
}

for b.building.msgCount < msgCount {
msg, err := b.streamer.GetMessage(b.building.msgCount)
if err != nil {
log.Error("error getting message from streamer", "error", err)
return false, fmt.Errorf("error getting message from streamer: %w", err)
}

if msg.Message.Header.BlockNumber < l1BoundMinBlockNumberWithBypass || msg.Message.Header.Timestamp < l1BoundMinTimestampWithBypass {
log.Warn(
"disabling L1 bound as batch posting message is close to the maximum delay",
Expand All @@ -1443,10 +1432,6 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
break
}

err = b.checkEspressoValidation()
if err != nil {
return false, fmt.Errorf("error checking espresso valdiation: %w", err)
}
isDelayed := msg.DelayedMessagesRead > b.building.segments.delayedMsg
success, err := b.building.segments.AddMessage(msg)
if err != nil {
Expand Down Expand Up @@ -1490,6 +1475,7 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
// #nosec G115
firstUsefulMsgTime = time.Unix(int64(b.building.firstUsefulMsg.Message.Header.Timestamp), 0)
if time.Since(firstUsefulMsgTime) >= config.MaxDelay {
log.Info("attempting to post batch due to max delay", "firstUsefulMsgTime", firstUsefulMsgTime, "first useful msg timestamp", b.building.firstUsefulMsg.Message.Header.Timestamp)
forcePostBatch = true
}
}
Expand Down Expand Up @@ -1519,7 +1505,12 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
// don't post anything for now
return false, nil
}

// If we are checking the validation, set isWaitingForEspressoValidation in the batch segments and re-poll the function until we are ready to post.
hasBatchBeenValidated := b.checkEspressoValidation()
log.Info("Batch validation status:", "hasBatchBeenValidated", hasBatchBeenValidated, "b.building.msgCount", b.building.msgCount, "b.building.startMsgCount", b.building.startMsgCount)
if !hasBatchBeenValidated {
return false, nil // We want to return false nil because we if we propegate an error we clear the batch cache when we don't want to
}
sequencerMsg, err := b.building.segments.CloseAndGetBytes()
if err != nil {
return false, err
Expand Down
6 changes: 6 additions & 0 deletions arbnode/espresso_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ const MAX_ATTESTATION_QUOTE_SIZE int = 4 * 1024
const LEN_SIZE int = 8
const INDEX_SIZE int = 8

type SubmittedEspressoTx struct {
Hash string
Pos []arbutil.MessageIndex
Payload []byte
}

func buildRawHotShotPayload(
msgPositions []arbutil.MessageIndex,
msgFetcher func(arbutil.MessageIndex) ([]byte, error),
Expand Down
33 changes: 33 additions & 0 deletions arbnode/espresso_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (

espressoTypes "github.com/EspressoSystems/espresso-sequencer-go/types"

"github.com/ethereum/go-ethereum/rlp"

"github.com/offchainlabs/nitro/arbutil"
)

Expand Down Expand Up @@ -123,3 +125,34 @@ func TestParsePayloadInvalidCases(t *testing.T) {
})
}
}

func TestSerdeSubmittedEspressoTx(t *testing.T) {
submiitedTx := SubmittedEspressoTx{
Hash: "0x1234",
Pos: []arbutil.MessageIndex{arbutil.MessageIndex(10)},
Payload: []byte{0, 1, 2, 3},
}

b, err := rlp.EncodeToBytes(&submiitedTx)
if err != nil {
t.Error("failed to encode")
}

var expected SubmittedEspressoTx
err = rlp.DecodeBytes(b, &expected)
if err != nil {
t.Error("failed to encode")
}

if submiitedTx.Hash != expected.Hash {
t.Error("failed to check hash")
}

if submiitedTx.Pos[0] != expected.Pos[0] {
t.Error("failed to check pos")
}

if !bytes.Equal(submiitedTx.Payload, expected.Payload) {
t.Error("failed to check payload")
}
}
21 changes: 9 additions & 12 deletions arbnode/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,19 @@
package arbnode

var (
messagePrefix []byte = []byte("m") // maps a message sequence number to a message
blockHashInputFeedPrefix []byte = []byte("b") // maps a message sequence number to a block hash received through the input feed
messageResultPrefix []byte = []byte("r") // maps a message sequence number to a message result
legacyDelayedMessagePrefix []byte = []byte("d") // maps a delayed sequence number to an accumulator and a message as serialized on L1
rlpDelayedMessagePrefix []byte = []byte("e") // maps a delayed sequence number to an accumulator and an RLP encoded message
parentChainBlockNumberPrefix []byte = []byte("p") // maps a delayed sequence number to a parent chain block number
sequencerBatchMetaPrefix []byte = []byte("s") // maps a batch sequence number to BatchMetadata
delayedSequencedPrefix []byte = []byte("a") // maps a delayed message count to the first sequencer batch sequence number with this delayed count

messagePrefix []byte = []byte("m") // maps a message sequence number to a message
blockHashInputFeedPrefix []byte = []byte("b") // maps a message sequence number to a block hash received through the input feed
messageResultPrefix []byte = []byte("r") // maps a message sequence number to a message result
legacyDelayedMessagePrefix []byte = []byte("d") // maps a delayed sequence number to an accumulator and a message as serialized on L1
rlpDelayedMessagePrefix []byte = []byte("e") // maps a delayed sequence number to an accumulator and an RLP encoded message
parentChainBlockNumberPrefix []byte = []byte("p") // maps a delayed sequence number to a parent chain block number
sequencerBatchMetaPrefix []byte = []byte("s") // maps a batch sequence number to BatchMetadata
delayedSequencedPrefix []byte = []byte("a") // maps a delayed message count to the first sequencer batch sequence number with this delayed count
messageCountKey []byte = []byte("_messageCount") // contains the current message count
delayedMessageCountKey []byte = []byte("_delayedMessageCount") // contains the current delayed message count
sequencerBatchCountKey []byte = []byte("_sequencerBatchCount") // contains the current sequencer message count
dbSchemaVersion []byte = []byte("_schemaVersion") // contains a uint64 representing the database schema version
espressoSubmittedPos []byte = []byte("_espressoSubmittedPos") // contains the current message indices of the last submitted txns
espressoSubmittedHash []byte = []byte("_espressoSubmittedHash") // contains the hash of the last submitted txn
espressoSubmittedPayload []byte = []byte("_espressoSubmittedPayload") // contains the payload of the last submitted espresso txn
espressoSubmittedTxns []byte = []byte("_espressoSubmittedTxns") // contains the hash and pos of the submitted transactions
espressoPendingTxnsPositions []byte = []byte("_espressoPendingTxnsPos") // contains the index of the pending txns that need to be submitted to espresso
espressoLastConfirmedPos []byte = []byte("_espressoLastConfirmedPos") // contains the position of the last confirmed message
)
Expand Down
Loading

0 comments on commit 4f797fa

Please sign in to comment.