Skip to content

Commit

Permalink
op-node: Fully remove the progress API (ethereum-optimism#3623)
Browse files Browse the repository at this point in the history
* op-node: Clean up pipeline

Now that the engine queue is the only step stage, it is easy to
consolidate different loops inside the derivation pipeline.

* op-node: Fully remove the progress API

It has been partially replaced with the Origin API, but the open/closed
distinction no longer exists.

Co-authored-by: Matthew Slipper <[email protected]>
  • Loading branch information
trianglesphere and mslipper authored Oct 4, 2022
1 parent f39853e commit 46202cd
Show file tree
Hide file tree
Showing 13 changed files with 65 additions and 225 deletions.
2 changes: 1 addition & 1 deletion op-e2e/actions/l2_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func NewL2Verifier(log log.Logger, l1 derive.L1Fetcher, eng derive.Engine, cfg *

func (s *L2Verifier) SyncStatus() *eth.SyncStatus {
return &eth.SyncStatus{
CurrentL1: s.derivation.Progress().Origin,
CurrentL1: s.derivation.Origin(),
HeadL1: s.l1Head,
SafeL1: s.l1Safe,
FinalizedL1: s.l1Finalized,
Expand Down
6 changes: 0 additions & 6 deletions op-node/rollup/derive/batch_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,6 @@ import (
// It is internally responsible for making sure that batches with L1 inclusions block outside it's
// working range are not considered or pruned.

type BatchQueueOutput interface {
StageProgress
AddBatch(batch *BatchData)
SafeL2Head() eth.L2BlockRef
}

type NextBatchProvider interface {
Origin() eth.L1BlockRef
NextBatch(ctx context.Context) (*BatchData, error)
Expand Down
2 changes: 1 addition & 1 deletion op-node/rollup/derive/channel_bank.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type ChannelBank struct {
fetcher L1Fetcher
}

var _ PullStage = (*ChannelBank)(nil)
var _ ResetableStage = (*ChannelBank)(nil)

// NewChannelBank creates a ChannelBank, which should be Reset(origin) before use.
func NewChannelBank(log log.Logger, cfg *rollup.Config, prev NextDataProvider, fetcher L1Fetcher) *ChannelBank {
Expand Down
2 changes: 1 addition & 1 deletion op-node/rollup/derive/channel_in_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type ChannelInReader struct {
prev *ChannelBank
}

var _ PullStage = (*ChannelInReader)(nil)
var _ ResetableStage = (*ChannelInReader)(nil)

// NewChannelInReader creates a ChannelInReader, which should be Reset(origin) before use.
func NewChannelInReader(log log.Logger, prev *ChannelBank) *ChannelInReader {
Expand Down
28 changes: 14 additions & 14 deletions op-node/rollup/derive/engine_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,14 @@ type EngineQueue struct {
engine Engine
prev NextAttributesProvider

progress Progress // only used for pipeline resets
origin eth.L1BlockRef // only used for pipeline resets

metrics Metrics
metrics Metrics
l1Fetcher L1Fetcher
}

// NewEngineQueue creates a new EngineQueue, which should be Reset(origin) before use.
func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics Metrics, prev NextAttributesProvider) *EngineQueue {
func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics Metrics, prev NextAttributesProvider, l1Fetcher L1Fetcher) *EngineQueue {
return &EngineQueue{
log: log,
cfg: cfg,
Expand All @@ -95,12 +96,13 @@ func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics M
MaxSize: maxUnsafePayloadsMemory,
SizeFn: payloadMemSize,
},
prev: prev,
prev: prev,
l1Fetcher: l1Fetcher,
}
}

func (eq *EngineQueue) Progress() Progress {
return eq.progress
func (eq *EngineQueue) Origin() eth.L1BlockRef {
return eq.origin
}

func (eq *EngineQueue) SetUnsafeHead(head eth.L2BlockRef) {
Expand Down Expand Up @@ -151,7 +153,7 @@ func (eq *EngineQueue) LastL2Time() uint64 {
return uint64(eq.safeAttributes[len(eq.safeAttributes)-1].Timestamp)
}

func (eq *EngineQueue) Step(ctx context.Context, _ Progress) error {
func (eq *EngineQueue) Step(ctx context.Context) error {
if len(eq.safeAttributes) > 0 {
return eq.tryNextSafeAttributes(ctx)
}
Expand Down Expand Up @@ -402,13 +404,13 @@ func (eq *EngineQueue) forceNextSafeAttributes(ctx context.Context) error {

// ResetStep Walks the L2 chain backwards until it finds an L2 block whose L1 origin is canonical.
// The unsafe head is set to the head of the L2 chain, unless the existing safe head is not canonical.
func (eq *EngineQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error {
result, err := sync.FindL2Heads(ctx, eq.cfg, l1Fetcher, eq.engine)
func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef) error {
result, err := sync.FindL2Heads(ctx, eq.cfg, eq.l1Fetcher, eq.engine)
if err != nil {
return NewTemporaryError(fmt.Errorf("failed to find the L2 Heads to start from: %w", err))
}
finalized, safe, unsafe := result.Finalized, result.Safe, result.Unsafe
l1Origin, err := l1Fetcher.L1BlockRefByHash(ctx, safe.L1Origin.Hash)
l1Origin, err := eq.l1Fetcher.L1BlockRefByHash(ctx, safe.L1Origin.Hash)
if err != nil {
return NewTemporaryError(fmt.Errorf("failed to fetch the new L1 progress: origin: %v; err: %w", safe.L1Origin, err))
}
Expand All @@ -421,7 +423,7 @@ func (eq *EngineQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error
if l1Origin.Number < eq.cfg.ChannelTimeout {
pipelineNumber = 0
}
pipelineOrigin, err := l1Fetcher.L1BlockRefByNumber(ctx, pipelineNumber)
pipelineOrigin, err := eq.l1Fetcher.L1BlockRefByNumber(ctx, pipelineNumber)
if err != nil {
return NewTemporaryError(fmt.Errorf("failed to fetch the new L1 progress: origin: %v; err: %w", pipelineNumber, err))
}
Expand All @@ -431,9 +433,7 @@ func (eq *EngineQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error
eq.finalized = finalized
eq.finalityData = eq.finalityData[:0]
// note: we do not clear the unsafe payloadds queue; if the payloads are not applicable anymore the parent hash checks will clear out the old payloads.
eq.progress = Progress{
Origin: pipelineOrigin,
}
eq.origin = pipelineOrigin
eq.metrics.RecordL2Ref("l2_finalized", finalized)
eq.metrics.RecordL2Ref("l2_safe", safe)
eq.metrics.RecordL2Ref("l2_unsafe", unsafe)
Expand Down
10 changes: 5 additions & 5 deletions op-node/rollup/derive/engine_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,21 +230,21 @@ func TestEngineQueue_Finalize(t *testing.T) {

prev := &fakeAttributesQueue{}

eq := NewEngineQueue(logger, cfg, eng, metrics, prev)
require.ErrorIs(t, eq.ResetStep(context.Background(), l1F), io.EOF)
eq := NewEngineQueue(logger, cfg, eng, metrics, prev, l1F)
require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}), io.EOF)

require.Equal(t, refB1, eq.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for")
require.Equal(t, refB, eq.Progress().Origin, "Expecting to be set back derivation L1 progress to B")
require.Equal(t, refB, eq.Origin(), "Expecting to be set back derivation L1 progress to B")
require.Equal(t, refA1, eq.Finalized(), "A1 is recognized as finalized before we run any steps")

// now say C1 was included in D and became the new safe head
eq.progress.Origin = refD
eq.origin = refD
prev.origin = refD
eq.safeHead = refC1
eq.postProcessSafeL2()

// now say D0 was included in E and became the new safe head
eq.progress.Origin = refE
eq.origin = refE
prev.origin = refE
eq.safeHead = refD0
eq.postProcessSafeL2()
Expand Down
2 changes: 1 addition & 1 deletion op-node/rollup/derive/l1_retrieval.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type L1Retrieval struct {
datas DataIter
}

var _ PullStage = (*L1Retrieval)(nil)
var _ ResetableStage = (*L1Retrieval)(nil)

func NewL1Retrieval(log log.Logger, dataSrc DataAvailabilitySource, prev NextBlockProvider) *L1Retrieval {
return &L1Retrieval{
Expand Down
2 changes: 1 addition & 1 deletion op-node/rollup/derive/l1_traversal.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type L1Traversal struct {
log log.Logger
}

var _ PullStage = (*L1Traversal)(nil)
var _ ResetableStage = (*L1Traversal)(nil)

func NewL1Traversal(log log.Logger, l1Blocks L1BlockRefByNumberFetcher) *L1Traversal {
return &L1Traversal{
Expand Down
125 changes: 35 additions & 90 deletions op-node/rollup/derive/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,46 +24,22 @@ type L1Fetcher interface {
L1TransactionFetcher
}

type StageProgress interface {
Progress() Progress
}

type PullStage interface {
type ResetableStage interface {
// Reset resets a pull stage. `base` refers to the L1 Block Reference to reset to.
// TODO: Return L1 Block reference
Reset(ctx context.Context, base eth.L1BlockRef) error
}

type Stage interface {
StageProgress

// Step tries to progress the state.
// The outer stage progress informs the step what to do.
//
// If the stage:
// - returns EOF: the stage will be skipped
// - returns another error: the stage will make the pipeline error.
// - returns nil: the stage will be repeated next Step
Step(ctx context.Context, outer Progress) error

// ResetStep prepares the state for usage in regular steps.
// Similar to Step(ctx) it returns:
// - EOF if the next stage should be reset
// - error if the reset should start all over again
// - nil if the reset should continue resetting this stage.
ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error
}

type EngineQueueStage interface {
Finalized() eth.L2BlockRef
UnsafeL2Head() eth.L2BlockRef
SafeL2Head() eth.L2BlockRef
Progress() Progress
Origin() eth.L1BlockRef
SetUnsafeHead(head eth.L2BlockRef)

Finalize(l1Origin eth.BlockID)
AddSafeAttributes(attributes *eth.PayloadAttributes)
AddUnsafePayload(payload *eth.ExecutionPayload)
Step(context.Context) error
}

// DerivationPipeline is updated with new L1 data, and the Step() function can be iterated on to keep the L2 Engine in sync.
Expand All @@ -74,19 +50,12 @@ type DerivationPipeline struct {

// Index of the stage that is currently being reset.
// >= len(stages) if no additional resetting is required
resetting int
pullResetIdx int

// Index of the stage that is currently being processed.
active int
resetting int
stages []ResetableStage

// stages in execution order. A stage Step that:
stages []Stage

pullStages []PullStage
traversal *L1Traversal

eng EngineQueueStage
// Special stages to keep track of
traversal *L1Traversal
eng EngineQueueStage

metrics Metrics
}
Expand All @@ -103,33 +72,32 @@ func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetch
batchQueue := NewBatchQueue(log, cfg, chInReader)
attributesQueue := NewAttributesQueue(log, cfg, l1Fetcher, batchQueue)

// Push stages (that act like pull stages b/c we push from the innermost stages prior to the outermost stages)
eng := NewEngineQueue(log, cfg, engine, metrics, attributesQueue)
// Step stages
eng := NewEngineQueue(log, cfg, engine, metrics, attributesQueue, l1Fetcher)

stages := []Stage{eng}
pullStages := []PullStage{attributesQueue, batchQueue, chInReader, bank, l1Src, l1Traversal}
// Reset from engine queue then up from L1 Traversal. The stages do not talk to each other during
// the reset, but after the engine queue, this is the order in which the stages could talk to each other.
// Note: The engine queue stage is the only reset that can fail.
stages := []ResetableStage{eng, l1Traversal, l1Src, bank, chInReader, batchQueue, attributesQueue}

return &DerivationPipeline{
log: log,
cfg: cfg,
l1Fetcher: l1Fetcher,
resetting: 0,
active: 0,
stages: stages,
pullStages: pullStages,
eng: eng,
metrics: metrics,
traversal: l1Traversal,
log: log,
cfg: cfg,
l1Fetcher: l1Fetcher,
resetting: 0,
stages: stages,
eng: eng,
metrics: metrics,
traversal: l1Traversal,
}
}

func (dp *DerivationPipeline) Reset() {
dp.resetting = 0
dp.pullResetIdx = 0
}

func (dp *DerivationPipeline) Progress() Progress {
return dp.eng.Progress()
func (dp *DerivationPipeline) Origin() eth.L1BlockRef {
return dp.eng.Origin()
}

func (dp *DerivationPipeline) Finalize(l1Origin eth.BlockID) {
Expand Down Expand Up @@ -165,12 +133,12 @@ func (dp *DerivationPipeline) AddUnsafePayload(payload *eth.ExecutionPayload) {
// An error is expected when the underlying source closes.
// When Step returns nil, it should be called again, to continue the derivation process.
func (dp *DerivationPipeline) Step(ctx context.Context) error {
defer dp.metrics.RecordL1Ref("l1_derived", dp.Progress().Origin)
defer dp.metrics.RecordL1Ref("l1_derived", dp.Origin())

// if any stages need to be reset, do that first.
if dp.resetting < len(dp.stages) {
if err := dp.stages[dp.resetting].ResetStep(ctx, dp.l1Fetcher); err == io.EOF {
dp.log.Debug("reset of stage completed", "stage", dp.resetting, "origin", dp.stages[dp.resetting].Progress().Origin)
if err := dp.stages[dp.resetting].Reset(ctx, dp.eng.Origin()); err == io.EOF {
dp.log.Debug("reset of stage completed", "stage", dp.resetting, "origin", dp.eng.Origin())
dp.resetting += 1
return nil
} else if err != nil {
Expand All @@ -179,37 +147,14 @@ func (dp *DerivationPipeline) Step(ctx context.Context) error {
return nil
}
}
// Then reset the pull based stages
if dp.pullResetIdx < len(dp.pullStages) {
// Use the last stage's progress as the one to pull from
inner := dp.stages[len(dp.stages)-1].Progress()

// Do the reset
if err := dp.pullStages[dp.pullResetIdx].Reset(ctx, inner.Origin); err == io.EOF {
// dp.log.Debug("reset of stage completed", "stage", dp.pullResetIdx, "origin", dp.pullStages[dp.pullResetIdx].Progress().Origin)
dp.pullResetIdx += 1
return nil
} else if err != nil {
return fmt.Errorf("stage %d failed resetting: %w", dp.pullResetIdx, err)
} else {
return nil
}
}

// Lastly advance the stages
for i, stage := range dp.stages {
var outer Progress
if i+1 < len(dp.stages) {
outer = dp.stages[i+1].Progress()
}
if err := stage.Step(ctx, outer); err == io.EOF {
continue
} else if err != nil {
return fmt.Errorf("stage %d failed: %w", i, err)
} else {
return nil
}
// Now step the engine queue. It will pull earlier data as needed.
if err := dp.eng.Step(ctx); err == io.EOF {
// If every stage has returned io.EOF, try to advance the L1 Origin
return dp.traversal.AdvanceL1Block(ctx)
} else if err != nil {
return fmt.Errorf("engine stage failed: %w", err)
} else {
return nil
}
// If every stage has returned io.EOF, try to advance the L1 Origin
return dp.traversal.AdvanceL1Block(ctx)
}
Loading

0 comments on commit 46202cd

Please sign in to comment.