Skip to content

Commit

Permalink
op-node: reset engine through events (ethereum-optimism#10961)
Browse files Browse the repository at this point in the history
  • Loading branch information
protolambda authored Jun 19, 2024
1 parent 258a480 commit 2d0e83a
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 64 deletions.
2 changes: 2 additions & 0 deletions op-e2e/actions/l2_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc deri

metrics := &testutils.TestDerivationMetrics{}
ec := engine.NewEngineController(eng, log, metrics, cfg, syncCfg.SyncMode, synchronousEvents)
engineResetDeriver := engine.NewEngineResetDeriver(ctx, log, cfg, l1, eng, syncCfg, synchronousEvents)

clSync := clsync.NewCLSync(log, cfg, metrics, synchronousEvents)

Expand Down Expand Up @@ -144,6 +145,7 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc deri

*rootDeriver = rollup.SynchronousDerivers{
syncDeriver,
engineResetDeriver,
engDeriv,
rollupNode,
clSync,
Expand Down
2 changes: 2 additions & 0 deletions op-node/rollup/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ func NewDriver(
findL1Origin := NewL1OriginSelector(log, cfg, sequencerConfDepth)
verifConfDepth := NewConfDepth(driverCfg.VerifierConfDepth, l1State.L1Head, l1)
ec := engine.NewEngineController(l2, log, metrics, cfg, syncCfg.SyncMode, synchronousEvents)
engineResetDeriver := engine.NewEngineResetDeriver(driverCtx, log, cfg, l1, l2, syncCfg, synchronousEvents)
clSync := clsync.NewCLSync(log, cfg, metrics, synchronousEvents)

var finalizer Finalizer
Expand Down Expand Up @@ -246,6 +247,7 @@ func NewDriver(

*rootDeriver = []rollup.Deriver{
syncDeriver,
engineResetDeriver,
engDeriv,
schedDeriv,
driver,
Expand Down
38 changes: 32 additions & 6 deletions op-node/rollup/driver/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,9 +434,40 @@ func (s *SyncDeriver) OnEvent(ev rollup.Event) {
case rollup.EngineTemporaryErrorEvent:
s.Log.Warn("Derivation process temporary error", "err", x.Err)
s.Emitter.Emit(StepReqEvent{})
case engine.EngineResetConfirmedEvent:
s.onEngineConfirmedReset(x)
}
}

func (s *SyncDeriver) onEngineConfirmedReset(x engine.EngineResetConfirmedEvent) {
// If the listener update fails, we return,
// and don't confirm the engine-reset with the derivation pipeline.
// The pipeline will re-trigger a reset as necessary.
if s.SafeHeadNotifs != nil {
if err := s.SafeHeadNotifs.SafeHeadReset(x.Safe); err != nil {
s.Log.Error("Failed to warn safe-head notifier of safe-head reset", "safe", x.Safe)
return
}
if s.SafeHeadNotifs.Enabled() && x.Safe.Number == s.Config.Genesis.L2.Number && x.Safe.Hash == s.Config.Genesis.L2.Hash {
// The rollup genesis block is always safe by definition. So if the pipeline resets this far back we know
// we will process all safe head updates and can record genesis as always safe from L1 genesis.
// Note that it is not safe to use cfg.Genesis.L1 here as it is the block immediately before the L2 genesis
// but the contracts may have been deployed earlier than that, allowing creating a dispute game
// with a L1 head prior to cfg.Genesis.L1
l1Genesis, err := s.L1.L1BlockRefByNumber(s.Ctx, 0)
if err != nil {
s.Log.Error("Failed to retrieve L1 genesis, cannot notify genesis as safe block", "err", err)
return
}
if err := s.SafeHeadNotifs.SafeHeadUpdated(x.Safe, l1Genesis.ID()); err != nil {
s.Log.Error("Failed to notify safe-head listener of safe-head", "err", err)
return
}
}
}
s.Derivation.ConfirmEngineReset()
}

func (s *SyncDeriver) onStepEvent() {
s.Log.Debug("Sync process step", "onto_origin", s.Derivation.Origin())
// Note: while we refactor the SyncStep to be entirely event-based we have an intermediate phase
Expand Down Expand Up @@ -474,12 +505,7 @@ func (s *SyncDeriver) onResetEvent(x rollup.ResetEvent) {
s.Derivation.Reset()
s.Finalizer.Reset()
s.Emitter.Emit(StepReqEvent{})
if err := engine.ResetEngine(s.Ctx, s.Log, s.Config, s.Engine, s.L1, s.L2, s.SyncCfg, s.SafeHeadNotifs); err != nil {
s.Log.Error("Derivation pipeline not ready, failed to reset engine", "err", err)
// Derivation-pipeline will return a new ResetError until we confirm the engine has been successfully reset.
return
}
s.Derivation.ConfirmEngineReset()
s.Emitter.Emit(engine.ResetEngineRequestEvent{})
}

type DeriverIdleEvent struct{}
Expand Down
94 changes: 38 additions & 56 deletions op-node/rollup/engine/engine_reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,72 +7,54 @@ import (
"github.com/ethereum/go-ethereum/log"

"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-service/eth"
)

type ResetL2 interface {
sync.L2Chain
derive.SystemConfigL2Fetcher
}
// ResetEngineRequestEvent requests the EngineResetDeriver to walk
// the L2 chain backwards until it finds a plausible unsafe head,
// and find an L2 safe block that is guaranteed to still be from the L1 chain.
type ResetEngineRequestEvent struct{}

type ResetEngineControl interface {
SetUnsafeHead(eth.L2BlockRef)
SetSafeHead(eth.L2BlockRef)
SetFinalizedHead(eth.L2BlockRef)
func (ev ResetEngineRequestEvent) String() string {
return "reset-engine-request"
}

SetBackupUnsafeL2Head(block eth.L2BlockRef, triggerReorg bool)
SetPendingSafeL2Head(eth.L2BlockRef)
type EngineResetDeriver struct {
ctx context.Context
log log.Logger
cfg *rollup.Config
l1 sync.L1Chain
l2 sync.L2Chain
syncCfg *sync.Config

ResetBuildingState()
emitter rollup.EventEmitter
}

// ResetEngine walks the L2 chain backwards until it finds a plausible unsafe head,
// and an L2 safe block that is guaranteed to still be from the L1 chain.
func ResetEngine(ctx context.Context, log log.Logger, cfg *rollup.Config, ec ResetEngineControl, l1 sync.L1Chain, l2 ResetL2, syncCfg *sync.Config, safeHeadNotifs rollup.SafeHeadListener) error {
result, err := sync.FindL2Heads(ctx, cfg, l1, l2, log, syncCfg)
if err != nil {
return derive.NewTemporaryError(fmt.Errorf("failed to find the L2 Heads to start from: %w", err))
}
finalized, safe, unsafe := result.Finalized, result.Safe, result.Unsafe
l1Origin, err := l1.L1BlockRefByHash(ctx, safe.L1Origin.Hash)
if err != nil {
return derive.NewTemporaryError(fmt.Errorf("failed to fetch the new L1 progress: origin: %v; err: %w", safe.L1Origin, err))
}
if safe.Time < l1Origin.Time {
return derive.NewResetError(fmt.Errorf("cannot reset block derivation to start at L2 block %s with time %d older than its L1 origin %s with time %d, time invariant is broken",
safe, safe.Time, l1Origin, l1Origin.Time))
func NewEngineResetDeriver(ctx context.Context, log log.Logger, cfg *rollup.Config,
l1 sync.L1Chain, l2 sync.L2Chain, syncCfg *sync.Config, emitter rollup.EventEmitter) *EngineResetDeriver {
return &EngineResetDeriver{
ctx: ctx,
log: log,
cfg: cfg,
l1: l1,
l2: l2,
syncCfg: syncCfg,
emitter: emitter,
}
}

ec.SetUnsafeHead(unsafe)
ec.SetSafeHead(safe)
ec.SetPendingSafeL2Head(safe)
ec.SetFinalizedHead(finalized)
ec.SetBackupUnsafeL2Head(eth.L2BlockRef{}, false)
ec.ResetBuildingState()

log.Debug("Reset of Engine is completed", "safeHead", safe, "unsafe", unsafe, "safe_timestamp", safe.Time,
"unsafe_timestamp", unsafe.Time, "l1Origin", l1Origin)

if safeHeadNotifs != nil {
if err := safeHeadNotifs.SafeHeadReset(safe); err != nil {
return err
}
if safeHeadNotifs.Enabled() && safe.Number == cfg.Genesis.L2.Number && safe.Hash == cfg.Genesis.L2.Hash {
// The rollup genesis block is always safe by definition. So if the pipeline resets this far back we know
// we will process all safe head updates and can record genesis as always safe from L1 genesis.
// Note that it is not safe to use cfg.Genesis.L1 here as it is the block immediately before the L2 genesis
// but the contracts may have been deployed earlier than that, allowing creating a dispute game
// with a L1 head prior to cfg.Genesis.L1
l1Genesis, err := l1.L1BlockRefByNumber(ctx, 0)
if err != nil {
return fmt.Errorf("failed to retrieve L1 genesis: %w", err)
}
if err := safeHeadNotifs.SafeHeadUpdated(safe, l1Genesis.ID()); err != nil {
return err
}
func (d *EngineResetDeriver) OnEvent(ev rollup.Event) {
switch ev.(type) {
case ResetEngineRequestEvent:
result, err := sync.FindL2Heads(d.ctx, d.cfg, d.l1, d.l2, d.log, d.syncCfg)
if err != nil {
d.emitter.Emit(rollup.ResetEvent{Err: fmt.Errorf("failed to find the L2 Heads to start from: %w", err)})
return
}
d.emitter.Emit(ForceEngineResetEvent{
Unsafe: result.Unsafe,
Safe: result.Safe,
Finalized: result.Finalized,
})
}
return nil
}
45 changes: 45 additions & 0 deletions op-node/rollup/engine/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,22 @@ func (ev TryUpdateEngineEvent) String() string {
return "try-update-engine"
}

type ForceEngineResetEvent struct {
Unsafe, Safe, Finalized eth.L2BlockRef
}

func (ev ForceEngineResetEvent) String() string {
return "force-engine-reset"
}

type EngineResetConfirmedEvent struct {
Unsafe, Safe, Finalized eth.L2BlockRef
}

func (ev EngineResetConfirmedEvent) String() string {
return "engine-reset-confirmed"
}

type EngDeriver struct {
log log.Logger
cfg *rollup.Config
Expand Down Expand Up @@ -146,5 +162,34 @@ func (d *EngDeriver) OnEvent(ev rollup.Event) {
SafeL2Head: d.ec.SafeL2Head(),
FinalizedL2Head: d.ec.Finalized(),
})
case ForceEngineResetEvent:
ForceEngineReset(d.ec, x)

// Time to apply the changes to the underlying engine
d.emitter.Emit(TryUpdateEngineEvent{})

log.Debug("Reset of Engine is completed",
"safeHead", x.Safe, "unsafe", x.Unsafe, "safe_timestamp", x.Safe.Time,
"unsafe_timestamp", x.Unsafe.Time)
d.emitter.Emit(EngineResetConfirmedEvent{})
}
}

type ResetEngineControl interface {
SetUnsafeHead(eth.L2BlockRef)
SetSafeHead(eth.L2BlockRef)
SetFinalizedHead(eth.L2BlockRef)
SetBackupUnsafeL2Head(block eth.L2BlockRef, triggerReorg bool)
SetPendingSafeL2Head(eth.L2BlockRef)
ResetBuildingState()
}

// ForceEngineReset is not to be used. The op-program needs it for now, until event processing is adopted there.
func ForceEngineReset(ec ResetEngineControl, x ForceEngineResetEvent) {
ec.SetUnsafeHead(x.Unsafe)
ec.SetSafeHead(x.Safe)
ec.SetPendingSafeL2Head(x.Safe)
ec.SetFinalizedHead(x.Finalized)
ec.SetBackupUnsafeL2Head(eth.L2BlockRef{}, false)
ec.ResetBuildingState()
}
13 changes: 11 additions & 2 deletions op-program/client/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,18 @@ func (d *MinimalSyncDeriver) SyncStep(ctx context.Context) error {
if err := d.engine.TryUpdateEngine(ctx); !errors.Is(err, engine.ErrNoFCUNeeded) {
return err
}
if err := engine.ResetEngine(ctx, d.logger, d.cfg, d.engine, d.l1Source, d.l2Source, d.syncCfg, nil); err != nil {
return err
// The below two calls emulate ResetEngine, without event-processing.
// This will be omitted after op-program adopts events, and the deriver code is used instead.
result, err := sync.FindL2Heads(ctx, d.cfg, d.l1Source, d.l2Source, d.logger, d.syncCfg)
if err != nil {
// not really a temporary error in this context, but preserves old ResetEngine behavior.
return derive.NewTemporaryError(fmt.Errorf("failed to determine starting point: %w", err))
}
engine.ForceEngineReset(d.engine, engine.ForceEngineResetEvent{
Unsafe: result.Unsafe,
Safe: result.Safe,
Finalized: result.Finalized,
})
d.pipeline.ConfirmEngineReset()
d.initialResetDone = true
}
Expand Down

0 comments on commit 2d0e83a

Please sign in to comment.