Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(batcher): multi-frame altda channels #310

Merged
merged 5 commits into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions op-batcher/batcher/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,29 +125,29 @@ func (c *channel) ID() derive.ChannelID {
}

// NextTxData dequeues the next frames from the channel and returns them encoded in a tx data packet.
// If cfg.UseBlobs is false, it returns txData with a single frame.
// If cfg.UseBlobs is true, it will read frames from its channel builder
// If cfg.DaType == DaTypeCalldata, it returns txData with a single frame.
// Else when cfg.DaType == DaTypeBlob or DaTypeAltDA, it will read frames from its channel builder
// until it either doesn't have more frames or the target number of frames is reached.
//
// NextTxData should only be called after HasTxData returned true.
func (c *channel) NextTxData() txData {
nf := c.cfg.MaxFramesPerTx()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @gastonponti, I'm wondering how do you set target-num-frames aka MaxFramesPerTx to make sure that your frames don't exceed the max frame size of 128KB?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a flag that you can use to set that:

	TargetNumFramesFlag = &cli.IntFlag{
		Name: "target-num-frames",
		Usage: "The target number of frames to create per channel. " +
			"Controls number of blobs per blob tx, if using Blob DA, " +
			"or number of frames per blob, if using altDA.",
		Value:   1,
		EnvVars: prefixEnvVars("TARGET_NUM_FRAMES"),
	}

Here are two different things.

The frame size from one side, and the "tx for the L1 data size" on the other.

The frame size is also configurable, and we should reduce it. We strictly don't need to have it as 128kb because we are not switching to blobs on eth, but to txData on eth. But if we set it to 128kb, it will be easier to switch to a eth blob scenario if we want, because we won't need to think about that size. De downside of setting it to 128kb and not 200kb is that you will need more txs in a full blocks scenario, but this is a fallback scenario that we shouldn't see often if eigenda works as it should.

The maxFramesPerTx is used for all the other types that are NOT the txDataType (or DaTypeCalldata from this PR)

func (cc *ChannelConfig) MaxFramesPerTx() int {
	if cc.DaType == DaTypeCalldata {
		return 1
	}
	return cc.TargetNumFrames
}

with that, you are guaranteeing, that in the case of switching to ethDa, you will have only one frame in the tx, and you won't exceed the max L1 tx data size

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we need to set TARGET_NUM_FRAMES or do we want to change the default in that flag?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only need to set that TARGET_NUM_FRAMES flag

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we document those values somewhere?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The frames are capped by this flag:

	MaxL1TxSizeBytesFlag = &cli.Uint64Flag{
		Name:    "max-l1-tx-size-bytes",
		Usage:   "The maximum size of a batch tx submitted to L1. Ignored for blobs, where max blob size will be used.",
		Value:   120_000, // will be overwritten to max for blob da-type
		EnvVars: prefixEnvVars("MAX_L1_TX_SIZE_BYTES"),
	}

actually is MaxL1TxSizeByte-1

It's not a moving thing that depends on the channel. The channel depends on the Frame size x number of Frames

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ignored for blobs, is for the ethDa for blobs, which we are not using. So our frame cap is actually set in the configurations

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I think I've understood.

Channel contains whatever unsafe data from L2
Channels produce a stream of txData objects (as many as required)
TxData objects contain TARGET_NUM_FRAMES frames
Frames are up to MaxFrameSize big

But now I'm wondering, how is it enforced that the txdata is within the required size limits?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that txData name is not the best to be honest.

But, with that txData, if it goes to the altDa, it builds a blob with N frames. But if it goes to ethda as tx-data (not blobs), it sends a tx per frame. So the frame size is the one limiting that

Copy link

@ezdac ezdac Feb 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@piersy

var buf bytes.Buffer
fn, err := c.co.OutputFrame(&buf, c.cfg.MaxFrameSize)
if err != io.EOF && err != nil {
return fmt.Errorf("writing frame[%d]: %w", fn, err)
}
// Mark as full if max index reached
// TODO: If there's still data in the compression pipeline of the channel out,
// we would miss it and the whole channel would be broken because the last
// frames would never be generated...
// Hitting the max index is impossible with current parameters, so ignore for
// now. Note that in order to properly catch this, we'd need to call Flush
// after every block addition to estimate how many more frames are coming.
if fn == math.MaxUint16 {
c.setFullErr(ErrMaxFrameIndex)
}
frame := frameData{
id: frameID{chID: c.co.ID(), frameNumber: fn},
data: buf.Bytes(),
}

Within the OutputFrame() the passed in size is used as the size of a buffer, where then data from the compression
stage is written to. The passed in buffer is then used as frameData in the batcher.

txdata := txData{frames: make([]frameData, 0, nf), asBlob: c.cfg.UseBlobs}
txdata := txData{frames: make([]frameData, 0, nf), daType: c.cfg.DaType}
for i := 0; i < nf && c.channelBuilder.HasPendingFrame(); i++ {
frame := c.channelBuilder.NextFrame()
txdata.frames = append(txdata.frames, frame)
}

id := txdata.ID().String()
c.log.Debug("returning next tx data", "id", id, "num_frames", len(txdata.frames), "as_blob", txdata.asBlob)
c.log.Debug("returning next tx data", "id", id, "num_frames", len(txdata.frames), "da_type", txdata.daType)
c.pendingTransactions[id] = txdata

return txdata
}

func (c *channel) HasTxData() bool {
if c.IsFull() || // If the channel is full, we should start to submit it
!c.cfg.UseBlobs { // If using calldata, we only send one frame per tx
c.cfg.DaType == DaTypeCalldata { // If using calldata, we only send one frame per tx
return c.channelBuilder.HasPendingFrame()
}
// Collect enough frames if channel is not full yet
Expand Down
11 changes: 7 additions & 4 deletions op-batcher/batcher/channel_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,12 @@ type ChannelConfig struct {
// BatchType indicates whether the channel uses SingularBatch or SpanBatch.
BatchType uint

// UseBlobs indicates that this channel should be sent as a multi-blob
// transaction with one blob per frame.
UseBlobs bool
// DaType indicates how the frames in this channel should be sent to the L1.
DaType DaType
}

func (cc ChannelConfig) UseBlobs() bool {
return cc.DaType == DaTypeBlob
}

// ChannelConfig returns a copy of the receiver.
Expand Down Expand Up @@ -93,7 +96,7 @@ func (cc *ChannelConfig) ReinitCompressorConfig() {
}

func (cc *ChannelConfig) MaxFramesPerTx() int {
if !cc.UseBlobs {
if cc.DaType == DaTypeCalldata {
return 1
}
return cc.TargetNumFrames
Expand Down
3 changes: 2 additions & 1 deletion op-batcher/batcher/channel_config_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ func TestDynamicEthChannelConfig_ChannelConfig(t *testing.T) {
calldataCfg := ChannelConfig{
MaxFrameSize: 120_000 - 1,
TargetNumFrames: 1,
DaType: DaTypeCalldata,
}
blobCfg := ChannelConfig{
MaxFrameSize: eth.MaxBlobDataSize - 1,
TargetNumFrames: 3, // gets closest to amortized fixed tx costs
UseBlobs: true,
DaType: DaTypeBlob,
}

tests := []struct {
Expand Down
10 changes: 5 additions & 5 deletions op-batcher/batcher/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,16 +207,16 @@ func (s *channelManager) TxData(l1Head eth.BlockID, isPectra bool) (txData, erro
newCfg := s.cfgProvider.ChannelConfig(isPectra)

// No change:
if newCfg.UseBlobs == s.defaultCfg.UseBlobs {
if newCfg.UseBlobs() == s.defaultCfg.UseBlobs() {
s.log.Debug("Recomputing optimal ChannelConfig: no need to switch DA type",
"useBlobs", s.defaultCfg.UseBlobs)
"useBlobs", s.defaultCfg.UseBlobs())
return s.nextTxData(channel)
}

// Change:
s.log.Info("Recomputing optimal ChannelConfig: changing DA type and requeing blocks...",
"useBlobsBefore", s.defaultCfg.UseBlobs,
"useBlobsAfter", newCfg.UseBlobs)
"useBlobsBefore", s.defaultCfg.UseBlobs(),
"useBlobsAfter", newCfg.UseBlobs())

// Invalidate the channel so its blocks
// get requeued:
Expand Down Expand Up @@ -317,7 +317,7 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error {
"compression_algo", cfg.CompressorConfig.CompressionAlgo,
"target_num_frames", cfg.TargetNumFrames,
"max_frame_size", cfg.MaxFrameSize,
"use_blobs", cfg.UseBlobs,
"da_type", cfg.DaType,
)
s.metr.RecordChannelOpened(pc.ID(), s.pendingBlocks())

Expand Down
9 changes: 5 additions & 4 deletions op-batcher/batcher/channel_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,11 +290,12 @@ func newFakeDynamicEthChannelConfig(lgr log.Logger,
calldataCfg := ChannelConfig{
MaxFrameSize: 120_000 - 1,
TargetNumFrames: 1,
DaType: DaTypeCalldata,
}
blobCfg := ChannelConfig{
MaxFrameSize: eth.MaxBlobDataSize - 1,
TargetNumFrames: 3, // gets closest to amortized fixed tx costs
UseBlobs: true,
DaType: DaTypeBlob,
}
calldataCfg.InitNoneCompressor()
blobCfg.InitNoneCompressor()
Expand Down Expand Up @@ -348,7 +349,7 @@ func TestChannelManager_TxData(t *testing.T) {

cfg.chooseBlobs = tc.chooseBlobsWhenChannelCreated
m := NewChannelManager(l, metrics.NoopMetrics, cfg, defaultTestRollupConfig)
require.Equal(t, tc.chooseBlobsWhenChannelCreated, m.defaultCfg.UseBlobs)
require.Equal(t, tc.chooseBlobsWhenChannelCreated, m.defaultCfg.DaType == DaTypeBlob)

// Seed channel manager with a block
rng := rand.New(rand.NewSource(99))
Expand Down Expand Up @@ -385,8 +386,8 @@ func TestChannelManager_TxData(t *testing.T) {
}

require.Equal(t, tc.numExpectedAssessments, cfg.assessments)
require.Equal(t, tc.chooseBlobsWhenChannelSubmitted, data.asBlob)
require.Equal(t, tc.chooseBlobsWhenChannelSubmitted, m.defaultCfg.UseBlobs)
require.Equal(t, tc.chooseBlobsWhenChannelSubmitted, data.daType == DaTypeBlob)
require.Equal(t, tc.chooseBlobsWhenChannelSubmitted, m.defaultCfg.DaType == DaTypeBlob)
})
}

Expand Down
4 changes: 2 additions & 2 deletions op-batcher/batcher/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func TestChannel_NextTxData_singleFrameTx(t *testing.T) {
const n = 6
lgr := testlog.Logger(t, log.LevelWarn)
ch, err := newChannelWithChannelOut(lgr, metrics.NoopMetrics, ChannelConfig{
UseBlobs: false,
DaType: DaTypeCalldata,
TargetNumFrames: n,
CompressorConfig: compressor.Config{
CompressionAlgo: derive.Zlib,
Expand Down Expand Up @@ -172,7 +172,7 @@ func TestChannel_NextTxData_multiFrameTx(t *testing.T) {
const n = eth.MaxBlobsPerBlobTx
lgr := testlog.Logger(t, log.LevelWarn)
ch, err := newChannelWithChannelOut(lgr, metrics.NoopMetrics, ChannelConfig{
UseBlobs: true,
DaType: DaTypeBlob,
TargetNumFrames: n,
CompressorConfig: compressor.Config{
CompressionAlgo: derive.Zlib,
Expand Down
29 changes: 12 additions & 17 deletions op-batcher/batcher/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -780,14 +780,6 @@ func (l *BatchSubmitter) cancelBlockingTx(queue *txmgr.Queue[txRef], receiptsCh

// publishToAltDAAndL1 posts the txdata to the DA Provider and then sends the commitment to L1.
func (l *BatchSubmitter) publishToAltDAAndL1(txdata txData, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef], daGroup *errgroup.Group) {
// sanity checks
if nf := len(txdata.frames); nf != 1 {
l.Log.Crit("Unexpected number of frames in calldata tx", "num_frames", nf)
}
if txdata.asBlob {
l.Log.Crit("Unexpected blob txdata with AltDA enabled")
}

// when posting txdata to an external DA Provider, we use a goroutine to avoid blocking the main loop
// since it may take a while for the request to return.
goroutineSpawned := daGroup.TryGo(func() error {
Expand Down Expand Up @@ -827,29 +819,32 @@ func (l *BatchSubmitter) publishToAltDAAndL1(txdata txData, queue *txmgr.Queue[t
// The method will block if the queue's MaxPendingTransactions is exceeded.
func (l *BatchSubmitter) sendTransaction(txdata txData, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef], daGroup *errgroup.Group) error {
var err error

// if Alt DA is enabled we post the txdata to the DA Provider and replace it with the commitment.
if l.Config.UseAltDA {
var candidate *txmgr.TxCandidate
switch txdata.daType {
case DaTypeAltDA:
if !l.Config.UseAltDA {
l.Log.Crit("Received AltDA type txdata without AltDA being enabled")
}
// if Alt DA is enabled we post the txdata to the DA Provider and replace it with the commitment.
l.publishToAltDAAndL1(txdata, queue, receiptsCh, daGroup)
// we return nil to allow publishStateToL1 to keep processing the next txdata
return nil
}

var candidate *txmgr.TxCandidate
if txdata.asBlob {
case DaTypeBlob:
if candidate, err = l.blobTxCandidate(txdata); err != nil {
// We could potentially fall through and try a calldata tx instead, but this would
// likely result in the chain spending more in gas fees than it is tuned for, so best
// to just fail. We do not expect this error to trigger unless there is a serious bug
// or configuration issue.
return fmt.Errorf("could not create blob tx candidate: %w", err)
}
} else {
case DaTypeCalldata:
// sanity check
if nf := len(txdata.frames); nf != 1 {
l.Log.Crit("Unexpected number of frames in calldata tx", "num_frames", nf)
}
candidate = l.calldataTxCandidate(txdata.CallData())
default:
l.Log.Crit("Unknown DA type", "da_type", txdata.daType)
}

l.sendTx(txdata, false, candidate, queue, receiptsCh)
Expand All @@ -867,7 +862,7 @@ func (l *BatchSubmitter) sendTx(txdata txData, isCancel bool, candidate *txmgr.T
candidate.GasLimit = intrinsicGas
}

queue.Send(txRef{id: txdata.ID(), isCancel: isCancel, isBlob: txdata.asBlob}, *candidate, receiptsCh)
queue.Send(txRef{id: txdata.ID(), isCancel: isCancel, isBlob: txdata.daType == DaTypeBlob}, *candidate, receiptsCh)
}

func (l *BatchSubmitter) blobTxCandidate(data txData) (*txmgr.TxCandidate, error) {
Expand Down
44 changes: 25 additions & 19 deletions op-batcher/batcher/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,33 +218,39 @@ func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error {
TargetNumFrames: cfg.TargetNumFrames,
SubSafetyMargin: cfg.SubSafetyMargin,
BatchType: cfg.BatchType,
// DaType: set below
}

switch cfg.DataAvailabilityType {
case flags.BlobsType, flags.AutoType:
if !cfg.TestUseMaxTxSizeForBlobs {
// account for version byte prefix
cc.MaxFrameSize = eth.MaxBlobDataSize - 1
if bs.UseAltDA {
if cfg.DataAvailabilityType == flags.CalldataType {
cc.DaType = DaTypeAltDA
} else {
return fmt.Errorf("altDA is currently only supported with calldata DA Type")
}
if cc.MaxFrameSize > altda.MaxInputSize {
return fmt.Errorf("max frame size %d exceeds altDA max input size %d", cc.MaxFrameSize, altda.MaxInputSize)
}
} else {
switch cfg.DataAvailabilityType {
case flags.BlobsType, flags.AutoType:
if !cfg.TestUseMaxTxSizeForBlobs {
// account for version byte prefix
cc.MaxFrameSize = eth.MaxBlobDataSize - 1
}
cc.DaType = DaTypeBlob
case flags.CalldataType: // do nothing
cc.DaType = DaTypeCalldata
default:
return fmt.Errorf("unknown data availability type: %v", cfg.DataAvailabilityType)
}
cc.UseBlobs = true
case flags.CalldataType: // do nothing
default:
return fmt.Errorf("unknown data availability type: %v", cfg.DataAvailabilityType)
}

if bs.UseAltDA && cfg.DataAvailabilityType != flags.CalldataType {
return fmt.Errorf("cannot use Blobs with Alt DA")
}
if bs.UseAltDA && cc.MaxFrameSize > altda.MaxInputSize {
return fmt.Errorf("max frame size %d exceeds altDA max input size %d", cc.MaxFrameSize, altda.MaxInputSize)
}

cc.InitCompressorConfig(cfg.ApproxComprRatio, cfg.Compressor, cfg.CompressionAlgo)

if cc.UseBlobs && !bs.RollupConfig.IsEcotone(uint64(time.Now().Unix())) {
if cc.UseBlobs() && !bs.RollupConfig.IsEcotone(uint64(time.Now().Unix())) {
return errors.New("cannot use Blobs before Ecotone")
}
if !cc.UseBlobs && bs.RollupConfig.IsEcotone(uint64(time.Now().Unix())) {
if !cc.UseBlobs() && bs.RollupConfig.IsEcotone(uint64(time.Now().Unix())) {
bs.Log.Warn("Ecotone upgrade is active, but batcher is not configured to use Blobs!")
}

Expand Down Expand Up @@ -276,7 +282,7 @@ func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error {
calldataCC := cc
calldataCC.TargetNumFrames = 1
calldataCC.MaxFrameSize = 120_000
calldataCC.UseBlobs = false
calldataCC.DaType = DaTypeCalldata
calldataCC.ReinitCompressorConfig()

bs.ChannelConfig = NewDynamicEthChannelConfig(bs.Log, 10*time.Second, bs.TxManager, cc, calldataCC)
Expand Down
2 changes: 1 addition & 1 deletion op-batcher/batcher/test_batch_submitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (l *TestBatchSubmitter) JamTxPool(ctx context.Context) error {
var candidate *txmgr.TxCandidate
var err error
cc := l.channelMgr.cfgProvider.ChannelConfig(true)
if cc.UseBlobs {
if cc.UseBlobs() {
candidate = l.calldataTxCandidate([]byte{})
} else if candidate, err = l.blobTxCandidate(emptyTxData); err != nil {
return err
Expand Down
15 changes: 14 additions & 1 deletion op-batcher/batcher/tx_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,27 @@ import (
"github.com/ethereum-optimism/optimism/op-service/eth"
)

// DaType determines how txData is submitted to L1.
type DaType int

const (
// DaTypeCalldata means that the (single) frame in the txData is submitted as calldata.
DaTypeCalldata DaType = iota
// DaTypeBlob means that the frame(s) in the txData are submitted as ethereum 4844 blobs.
DaTypeBlob
// DaTypeAltDA means that the frame(s) in the txData are submitted to an altda da-server.
DaTypeAltDA
)

// txData represents the data for a single transaction.
//
// Note: The batcher currently sends exactly one frame per transaction. This
// might change in the future to allow for multiple frames from possibly
// different channels.
type txData struct {
frames []frameData
asBlob bool // indicates whether this should be sent as blob
// daType represents the DA type which the frames data will be submitted to.
daType DaType
}

func singleFrameTxData(frame frameData) txData {
Expand Down
6 changes: 4 additions & 2 deletions op-batcher/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,10 @@ var (
EnvVars: prefixEnvVars("MAX_BLOCKS_PER_SPAN_BATCH"),
}
TargetNumFramesFlag = &cli.IntFlag{
Name: "target-num-frames",
Usage: "The target number of frames to create per channel. Controls number of blobs per blob tx, if using Blob DA.",
Name: "target-num-frames",
Usage: "The target number of frames to create per channel. " +
"Controls number of blobs per blob tx, if using Blob DA, " +
"or number of frames per blob, if using altDA.",
Value: 1,
EnvVars: prefixEnvVars("TARGET_NUM_FRAMES"),
}
Expand Down
Loading