Skip to content

Commit

Permalink
op-batcher: Rework batcher to use submission safety margin
Browse files Browse the repository at this point in the history
- Removes the ChannelSubTimeout config flag and instead uses the SubSafetyMargin
  as a #L1-block safety margin to both, the channel timeout and sequencing
  window timeout
- Remove unused flags SequencerBatchInboxAddress and ChannelTimeout as
  these values are now queried from the rollup node.
  • Loading branch information
sebastianst committed Feb 3, 2023
1 parent e864ee9 commit a2189aa
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 120 deletions.
55 changes: 28 additions & 27 deletions op-batcher/batcher/channel_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,10 @@ type (
channelBuilder struct {
cfg ChannelConfig

// L1 block timestamp of channel timeout. 0 if no timeout set yet.
// L1 block timestamp of combined channel & sequencing window timeout. 0 if
// no timeout set yet.
timeout uint64

// sequencer window timeout block. 0 if not set yet.
swTimeoutBlock uint64

// marked as full if a) max RLP input bytes, b) max num frames or c) max
// allowed frame index (uint16) has been reached
fullErr error
Expand All @@ -41,13 +39,10 @@ type (
// The maximum number of L1 blocks that the inclusion transactions of a
// channel's frames can span.
ChannelTimeout uint64
// ChannelSubTimeout is the maximum duration, in seconds, to attempt
// completing an opened channel. When reached, the channel is closed and all
// remaining frames are submitted. The batcher should set it shorter than
// the actual channel timeout (specified in number of L1 blocks), since
// submitting continued channel data to L1 is not instantaneous. It's not
// worth it to work with nearly timed-out channels.
ChannelSubTimeout uint64
// The batcher tx submission safety margin (in #L1-blocks) to subtract from
// a channel's timeout and sequencing window, to guarantee safe inclusion of
// a channel on L1.
SubSafetyMargin uint64
// The maximum byte-size a frame can have.
MaxFrameSize uint64
// The target number of frames to create per channel. Note that if the
Expand Down Expand Up @@ -128,25 +123,23 @@ func (c *channelBuilder) Reset() error {
}

// FramePublished calculates the submission timeout of this channel from the
// given frame inclusion tx timestamp. If an older frame tx has already been
// given frame inclusion L1-block number. If an older frame tx has already been
// seen, the timeout is not updated.
func (c *channelBuilder) FramePublished(ts uint64) {
timeout := ts + c.cfg.ChannelSubTimeout
if c.timeout == 0 || c.timeout > timeout {
c.timeout = timeout
}
func (c *channelBuilder) FramePublished(l1BlockNum uint64) {
timeout := l1BlockNum + c.cfg.ChannelTimeout - c.cfg.SubSafetyMargin
c.updateTimeout(timeout)
}

// TimedOut returns whether the passed timestamp is after the channel timeout.
// If no timeout is set yet, it returns false.
func (c *channelBuilder) TimedOut(ts uint64) bool {
return c.timeout != 0 && ts >= c.timeout
// TimedOut returns whether the passed block number is after the channel timeout
// block. If no block timeout is set yet, it returns false.
func (c *channelBuilder) TimedOut(blockNum uint64) bool {
return c.timeout != 0 && blockNum >= c.timeout
}

// TriggerTimeout checks if the channel is timed out at the given timestamp and
// in this case sets the channel as full with reason ErrChannelTimedOut.
func (c *channelBuilder) TriggerTimeout(ts uint64) {
if !c.IsFull() && c.TimedOut(ts) {
func (c *channelBuilder) TriggerTimeout(blockNum uint64) {
if !c.IsFull() && c.TimedOut(blockNum) {
c.setFullErr(ErrChannelTimedOut)
}
}
Expand Down Expand Up @@ -186,12 +179,20 @@ func (c *channelBuilder) AddBlock(block *types.Block) error {
return nil
}

// updateSwTimeout updates the block timeout with the sequencer window timeout
// derived from the batch's origin L1 block. The timeout is only moved forward
// if the derived sequencer window timeout is earlier than the current.
func (c *channelBuilder) updateSwTimeout(batch *derive.BatchData) {
if c.swTimeoutBlock != 0 {
return
timeout := uint64(batch.EpochNum) + c.cfg.SeqWindowSize - c.cfg.SubSafetyMargin
c.updateTimeout(timeout)
}

// updateTimeout updates the timeout block to the given block number if it is
// earlier then the current block timeout, or if it still unset.
func (c *channelBuilder) updateTimeout(timeoutBlockNum uint64) {
if c.timeout == 0 || c.timeout > timeoutBlockNum {
c.timeout = timeoutBlockNum
}
// TODO: subtract safety margin
c.swTimeoutBlock = uint64(batch.EpochNum) + c.cfg.SeqWindowSize
}

// InputTargetReached says whether the target amount of input data has been
Expand Down
18 changes: 9 additions & 9 deletions op-batcher/batcher/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,15 @@ type channelManager struct {
// Set of unconfirmed txID -> frame data. For tx resubmission
pendingTransactions map[txID][]byte
// Set of confirmed txID -> inclusion block. For determining if the channel is timed out
confirmedTransactions map[txID]eth.L1BlockRef
confirmedTransactions map[txID]eth.BlockID
}

func NewChannelManager(log log.Logger, cfg ChannelConfig) *channelManager {
return &channelManager{
log: log,
cfg: cfg,
pendingTransactions: make(map[txID][]byte),
confirmedTransactions: make(map[txID]eth.L1BlockRef),
confirmedTransactions: make(map[txID]eth.BlockID),
}
}

Expand Down Expand Up @@ -98,7 +98,7 @@ func (s *channelManager) TxFailed(id txID) {
// a channel have been marked as confirmed on L1 the channel may be invalid & need to be
// resubmitted.
// This function may reset the pending channel if the pending channel has timed out.
func (s *channelManager) TxConfirmed(id txID, inclusionBlock eth.L1BlockRef) {
func (s *channelManager) TxConfirmed(id txID, inclusionBlock eth.BlockID) {
s.log.Trace("marked transaction as confirmed", "id", id, "block", inclusionBlock)
if _, ok := s.pendingTransactions[id]; !ok {
s.log.Warn("unknown transaction marked as confirmed", "id", id, "block", inclusionBlock)
Expand All @@ -108,7 +108,7 @@ func (s *channelManager) TxConfirmed(id txID, inclusionBlock eth.L1BlockRef) {
}
delete(s.pendingTransactions, id)
s.confirmedTransactions[id] = inclusionBlock
s.pendingChannel.FramePublished(inclusionBlock.Time)
s.pendingChannel.FramePublished(inclusionBlock.Number)

// If this channel timed out, put the pending blocks back into the local saved blocks
// and then reset this state so it can try to build a new channel.
Expand All @@ -129,7 +129,7 @@ func (s *channelManager) TxConfirmed(id txID, inclusionBlock eth.L1BlockRef) {
func (s *channelManager) clearPendingChannel() {
s.pendingChannel = nil
s.pendingTransactions = make(map[txID][]byte)
s.confirmedTransactions = make(map[txID]eth.L1BlockRef)
s.confirmedTransactions = make(map[txID]eth.BlockID)
}

// pendingChannelIsTimedOut returns true if submitted channel has timed out.
Expand Down Expand Up @@ -191,7 +191,7 @@ func (s *channelManager) nextTxData() ([]byte, txID, error) {
//
// It currently ignores the l1Head provided and doesn't track channel timeouts
// or the sequencer window span yet.
func (s *channelManager) TxData(l1Head eth.L1BlockRef) ([]byte, txID, error) {
func (s *channelManager) TxData(l1Head eth.BlockID) ([]byte, txID, error) {
dataPending := s.pendingChannel != nil && s.pendingChannel.HasFrame()
s.log.Debug("Requested tx data", "l1Head", l1Head, "data_pending", dataPending, "blocks_pending", len(s.blocks))

Expand Down Expand Up @@ -224,7 +224,7 @@ func (s *channelManager) TxData(l1Head eth.L1BlockRef) ([]byte, txID, error) {
return s.nextTxData()
}

func (s *channelManager) ensurePendingChannel(l1Head eth.L1BlockRef) error {
func (s *channelManager) ensurePendingChannel(l1Head eth.BlockID) error {
if s.pendingChannel != nil {
return nil
}
Expand All @@ -239,8 +239,8 @@ func (s *channelManager) ensurePendingChannel(l1Head eth.L1BlockRef) error {
return nil
}

func (s *channelManager) triggerTimeout(l1Head eth.L1BlockRef) {
s.pendingChannel.TriggerTimeout(l1Head.Time)
func (s *channelManager) triggerTimeout(l1Head eth.BlockID) {
s.pendingChannel.TriggerTimeout(l1Head.Number)
ferr := s.pendingChannel.FullErr()
s.log.Debug("timeout triggered",
"l1Head", l1Head,
Expand Down
47 changes: 17 additions & 30 deletions op-batcher/batcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,10 @@ type CLIConfig struct {
// RollupRpc is the HTTP provider URL for the L2 rollup node.
RollupRpc string

// The maximum number of L1 blocks that the inclusion transactions of a
// channel's frames can span.
ChannelTimeout uint64

// ChannelSubTimeout is the maximum duration, in seconds, to attempt
// completing an opened channel. When reached, the channel is closed and all
// remaining frames are submitted. The batcher should set it shorter than
// the actual channel timeout (specified in number of L1 blocks), since
// submitting continued channel data to L1 is not instantaneous. It's not
// worth it to work with nearly timed-out channels.
ChannelSubTimeout uint64
// The batcher tx submission safety margin (in #L1-blocks) to subtract from
// a channel's timeout and sequencing window, to guarantee safe inclusion of
// a channel on L1.
SubSafetyMargin uint64

// PollInterval is the delay between querying L2 for more transaction
// and creating a new batch.
Expand Down Expand Up @@ -90,10 +83,6 @@ type CLIConfig struct {
// PrivateKey is the private key used to submit sequencer transactions.
PrivateKey string

// SequencerBatchInboxAddress is the address in which to send batch
// transactions.
SequencerBatchInboxAddress string

RPCConfig oprpc.CLIConfig

/* Optional Params */
Expand Down Expand Up @@ -147,26 +136,24 @@ func NewConfig(ctx *cli.Context) CLIConfig {
L1EthRpc: ctx.GlobalString(flags.L1EthRpcFlag.Name),
L2EthRpc: ctx.GlobalString(flags.L2EthRpcFlag.Name),
RollupRpc: ctx.GlobalString(flags.RollupRpcFlag.Name),
ChannelTimeout: ctx.GlobalUint64(flags.ChannelTimeoutFlag.Name),
ChannelSubTimeout: ctx.GlobalUint64(flags.ChannelSubTimeoutFlag.Name),
SubSafetyMargin: ctx.GlobalUint64(flags.SubSafetyMarginFlag.Name),
PollInterval: ctx.GlobalDuration(flags.PollIntervalFlag.Name),
NumConfirmations: ctx.GlobalUint64(flags.NumConfirmationsFlag.Name),
SafeAbortNonceTooLowCount: ctx.GlobalUint64(flags.SafeAbortNonceTooLowCountFlag.Name),
ResubmissionTimeout: ctx.GlobalDuration(flags.ResubmissionTimeoutFlag.Name),

/* Optional Flags */
MaxL1TxSize: ctx.GlobalUint64(flags.MaxL1TxSizeBytesFlag.Name),
TargetL1TxSize: ctx.GlobalUint64(flags.TargetL1TxSizeBytesFlag.Name),
TargetNumFrames: ctx.GlobalInt(flags.TargetNumFramesFlag.Name),
ApproxComprRatio: ctx.GlobalFloat64(flags.ApproxComprRatioFlag.Name),
Mnemonic: ctx.GlobalString(flags.MnemonicFlag.Name),
SequencerHDPath: ctx.GlobalString(flags.SequencerHDPathFlag.Name),
PrivateKey: ctx.GlobalString(flags.PrivateKeyFlag.Name),
SequencerBatchInboxAddress: ctx.GlobalString(flags.SequencerBatchInboxAddressFlag.Name),
RPCConfig: oprpc.ReadCLIConfig(ctx),
LogConfig: oplog.ReadCLIConfig(ctx),
MetricsConfig: opmetrics.ReadCLIConfig(ctx),
PprofConfig: oppprof.ReadCLIConfig(ctx),
SignerConfig: opsigner.ReadCLIConfig(ctx),
MaxL1TxSize: ctx.GlobalUint64(flags.MaxL1TxSizeBytesFlag.Name),
TargetL1TxSize: ctx.GlobalUint64(flags.TargetL1TxSizeBytesFlag.Name),
TargetNumFrames: ctx.GlobalInt(flags.TargetNumFramesFlag.Name),
ApproxComprRatio: ctx.GlobalFloat64(flags.ApproxComprRatioFlag.Name),
Mnemonic: ctx.GlobalString(flags.MnemonicFlag.Name),
SequencerHDPath: ctx.GlobalString(flags.SequencerHDPathFlag.Name),
PrivateKey: ctx.GlobalString(flags.PrivateKeyFlag.Name),
RPCConfig: oprpc.ReadCLIConfig(ctx),
LogConfig: oplog.ReadCLIConfig(ctx),
MetricsConfig: opmetrics.ReadCLIConfig(ctx),
PprofConfig: oppprof.ReadCLIConfig(ctx),
SignerConfig: opsigner.ReadCLIConfig(ctx),
}
}
34 changes: 11 additions & 23 deletions op-batcher/batcher/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,13 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger) (*BatchSubmitte
SignerFnFactory: signer,
Rollup: rcfg,
Channel: ChannelConfig{
SeqWindowSize: rcfg.SeqWindowSize,
ChannelTimeout: rcfg.ChannelTimeout,
ChannelSubTimeout: cfg.ChannelSubTimeout,
MaxFrameSize: cfg.MaxL1TxSize - 1, // subtract 1 byte for version
TargetFrameSize: cfg.TargetL1TxSize - 1, // subtract 1 byte for version
TargetNumFrames: cfg.TargetNumFrames,
ApproxComprRatio: cfg.ApproxComprRatio,
SeqWindowSize: rcfg.SeqWindowSize,
ChannelTimeout: rcfg.ChannelTimeout,
SubSafetyMargin: cfg.SubSafetyMargin,
MaxFrameSize: cfg.MaxL1TxSize - 1, // subtract 1 byte for version
TargetFrameSize: cfg.TargetL1TxSize - 1, // subtract 1 byte for version
TargetNumFrames: cfg.TargetNumFrames,
ApproxComprRatio: cfg.ApproxComprRatio,
},
}

Expand Down Expand Up @@ -245,12 +245,12 @@ func (l *BatchSubmitter) loop() {
for {
l1tip, err := l.l1Tip(l.ctx)
if err != nil {
l.log.Error("Failed to query L1 tip")
l.log.Error("Failed to query L1 tip", "error", err)
break
}

// Collect next transaction data
data, id, err := l.state.TxData(l1tip)
data, id, err := l.state.TxData(l1tip.ID())
if err == io.EOF {
l.log.Trace("no transaction data available")
break // local for loop
Expand Down Expand Up @@ -288,20 +288,8 @@ func (l *BatchSubmitter) recordFailedTx(id txID, err error) {

func (l *BatchSubmitter) recordConfirmedTx(id txID, receipt *types.Receipt) {
l.log.Info("Transaction confirmed", "tx_hash", receipt.TxHash, "status", receipt.Status, "block_hash", receipt.BlockHash, "block_number", receipt.BlockNumber)
// Unfortunately, a tx receipt doesn't include the timestamp, so we have to
// query the header.
l1ref, err := l.l1BlockRefByReceipt(l.ctx, receipt)
if err != nil {
// Very unlikely that tx sending worked but then we cannot get the
// header. Fall back to latest known L1 time to be on the safe side.
l1ref.Time = l.lastKnownL1Time
l.log.Warn("Failed to get block ref for successful batcher tx. Setting timestamp to latest know L1 block time.", "block_ref", l1ref)
} else {
l.lastKnownL1Time = l1ref.Time
}
// l1ref is guaranteed to have at least fields Hash, Number and Time set.
l.state.TxConfirmed(id, l1ref)

l1block := eth.BlockID{Number: receipt.BlockNumber.Uint64(), Hash: receipt.BlockHash}
l.state.TxConfirmed(id, l1block)
}

// l1Tip gets the current L1 tip as a L1BlockRef. The passed context is assumed
Expand Down
26 changes: 7 additions & 19 deletions op-batcher/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,13 @@ var (
Required: true,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "ROLLUP_RPC"),
}
ChannelTimeoutFlag = cli.Uint64Flag{
Name: "channel-timeout",
Usage: "The maximum number of L1 blocks that the inclusion transactions of a channel's frames can span",
SubSafetyMarginFlag = cli.Uint64Flag{
Name: "sub-safety-margin",
Usage: "The batcher tx submission safety margin (in #L1-blocks) to subtract " +
"from a channel's timeout and sequencing window, to guarantee safe inclusion " +
"of a channel on L1.",
Required: true,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "CHANNEL_TIMEOUT"),
}
ChannelSubTimeoutFlag = cli.Uint64Flag{
Name: "channel-sub-timeout",
Usage: "The maximum duration (in seconds) to attempt completing an opened channel, as opposed to submitting L2 blocks into a new channel.",
Required: true,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "CHANNEL_SUB_TIMEOUT"),
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "SUB_SAFETY_MARGIN"),
}
PollIntervalFlag = cli.DurationFlag{
Name: "poll-interval",
Expand Down Expand Up @@ -75,12 +71,6 @@ var (
Required: true,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "RESUBMISSION_TIMEOUT"),
}
SequencerBatchInboxAddressFlag = cli.StringFlag{
Name: "sequencer-batch-inbox-address",
Usage: "L1 Address to receive batch transactions",
Required: true,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "SEQUENCER_BATCH_INBOX_ADDRESS"),
}

/* Optional flags */

Expand Down Expand Up @@ -131,13 +121,11 @@ var requiredFlags = []cli.Flag{
L1EthRpcFlag,
L2EthRpcFlag,
RollupRpcFlag,
ChannelTimeoutFlag,
ChannelSubTimeoutFlag,
SubSafetyMarginFlag,
PollIntervalFlag,
NumConfirmationsFlag,
SafeAbortNonceTooLowCountFlag,
ResubmissionTimeoutFlag,
SequencerBatchInboxAddressFlag,
}

var optionalFlags = []cli.Flag{
Expand Down
8 changes: 3 additions & 5 deletions op-e2e/migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,11 +324,10 @@ func TestMigration(t *testing.T) {
L2EthRpc: gethNode.WSEndpoint(),
RollupRpc: rollupNode.HTTPEndpoint(),
MaxL1TxSize: 120_000,
TargetL1TxSize: 1,
TargetL1TxSize: 624,
TargetNumFrames: 1,
ApproxComprRatio: 1.0,
ChannelTimeout: deployCfg.ChannelTimeout,
ChannelSubTimeout: 24,
SubSafetyMargin: testSafetyMargin(deployCfg),
PollInterval: 50 * time.Millisecond,
NumConfirmations: 1,
ResubmissionTimeout: 5 * time.Second,
Expand All @@ -337,8 +336,7 @@ func TestMigration(t *testing.T) {
Level: "info",
Format: "text",
},
PrivateKey: hexPriv(secrets.Batcher),
SequencerBatchInboxAddress: deployCfg.BatchSenderAddress.String(),
PrivateKey: hexPriv(secrets.Batcher),
}, lgr.New("module", "batcher"))
require.NoError(t, err)
t.Cleanup(func() {
Expand Down
Loading

0 comments on commit a2189aa

Please sign in to comment.