Skip to content

Commit

Permalink
op-wheel: engine command logic
Browse files Browse the repository at this point in the history
  • Loading branch information
protolambda committed Dec 7, 2022
1 parent c030d85 commit ea8c462
Show file tree
Hide file tree
Showing 2 changed files with 365 additions and 0 deletions.
278 changes: 278 additions & 0 deletions op-wheel/engine/engine.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,278 @@
package engine

import (
"context"
"fmt"
"math/big"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/beacon"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/rpc"

"github.com/ethereum-optimism/optimism/op-node/client"
"github.com/ethereum-optimism/optimism/op-node/eth"
)

func DialClient(ctx context.Context, endpoint string, jwtSecret [32]byte) (client.RPC, error) {
auth := node.NewJWTAuth(jwtSecret)

rpcClient, err := rpc.DialOptions(ctx, endpoint, rpc.WithHTTPAuth(auth))
if err != nil {
return nil, fmt.Errorf("failed to dial engine endpoint: %w", err)
}
return client.NewBaseRPCClient(rpcClient), nil
}

type RPCBlock struct {
types.Header
Transactions []*types.Transaction `json:"transactions"`
}

func getBlock(ctx context.Context, client client.RPC, method string, tag string) (*types.Block, error) {
var bl *RPCBlock
err := client.CallContext(ctx, &bl, method, tag, true)
if err != nil {
return nil, err
}
return types.NewBlockWithHeader(&bl.Header).WithBody(bl.Transactions, nil), nil
}

func getHeader(ctx context.Context, client client.RPC, method string, tag string) (*types.Header, error) {
var header *types.Header
err := client.CallContext(ctx, &header, method, tag, false)
if err != nil {
return nil, err
}
return header, nil
}

func headSafeFinalized(ctx context.Context, client client.RPC) (head *types.Block, safe, finalized *types.Header, err error) {
head, err = getBlock(ctx, client, "eth_getBlockByNumber", "latest")
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to get block: %w", err)
}
safe, err = getHeader(ctx, client, "eth_getBlockByNumber", "safe")
if err != nil {
return head, nil, nil, fmt.Errorf("failed to get safe block: %w", err)
}
finalized, err = getHeader(ctx, client, "eth_getBlockByNumber", "finalized")
if err != nil {
return head, safe, nil, fmt.Errorf("failed to get finalized block: %w", err)
}
return head, safe, finalized, nil
}

func insertBlock(ctx context.Context, client client.RPC, payload *beacon.ExecutableDataV1) error {
var payloadResult *beacon.PayloadStatusV1
if err := client.CallContext(ctx, &payloadResult, "engine_newPayloadV1", payload); err != nil {
return fmt.Errorf("failed to insert block %d: %w", payload.Number, err)
}
if payloadResult.Status != string(eth.ExecutionValid) {
return fmt.Errorf("block insertion was not valid: %v", payloadResult.ValidationError)
}
return nil
}

func updateForkchoice(ctx context.Context, client client.RPC, head, safe, finalized common.Hash) error {
var post beacon.ForkChoiceResponse
if err := client.CallContext(ctx, &post, "engine_forkchoiceUpdatedV1",
beacon.ForkchoiceStateV1{
HeadBlockHash: head,
SafeBlockHash: safe,
FinalizedBlockHash: finalized,
}, nil); err != nil {
return fmt.Errorf("failed to set forkchoice with new block %s: %w", head, err)
}
if post.PayloadStatus.Status != string(eth.ExecutionValid) {
return fmt.Errorf("post-block forkchoice update was not valid: %v", post.PayloadStatus.ValidationError)
}
return nil
}

type BlockBuildingSettings struct {
BlockTime uint64
Random common.Hash
FeeRecipient common.Address
BuildTime time.Duration
}

func BuildBlock(ctx context.Context, client client.RPC, status *StatusData, settings *BlockBuildingSettings) (*beacon.ExecutableDataV1, error) {
var pre beacon.ForkChoiceResponse
if err := client.CallContext(ctx, &pre, "engine_forkchoiceUpdatedV1",
beacon.ForkchoiceStateV1{
HeadBlockHash: status.Head.Hash,
SafeBlockHash: status.Safe.Hash,
FinalizedBlockHash: status.Finalized.Hash,
}, beacon.PayloadAttributesV1{
Timestamp: status.Head.Time + settings.BlockTime,
Random: settings.Random,
SuggestedFeeRecipient: settings.FeeRecipient,
// TODO: maybe use the L2 fields to hack in tx embedding CLI option?
//Transactions: nil,
//NoTxPool: false,
//GasLimit: nil,
}); err != nil {
return nil, fmt.Errorf("failed to set forkchoice with new block: %w", err)
}
if pre.PayloadStatus.Status != string(eth.ExecutionValid) {
return nil, fmt.Errorf("pre-block forkchoice update was not valid: %v", pre.PayloadStatus.ValidationError)
}

// wait some time for the block to get built
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(settings.BuildTime):
}

var payload *beacon.ExecutableDataV1
if err := client.CallContext(ctx, &payload, "engine_getPayloadV1", pre.PayloadID); err != nil {
return nil, fmt.Errorf("failed to get payload %v, %d time after instructing engine to build it: %w", pre.PayloadID, settings.BuildTime, err)
}

if err := insertBlock(ctx, client, payload); err != nil {
return nil, err
}
if err := updateForkchoice(ctx, client, payload.BlockHash, status.Safe.Hash, status.Finalized.Hash); err != nil {
return nil, err
}

return payload, nil
}

func Auto(ctx context.Context, metrics Metricer, client client.RPC, log log.Logger, shutdown <-chan struct{}, settings *BlockBuildingSettings) error {
ticker := time.NewTicker(time.Millisecond * 100)
defer ticker.Stop()

var lastPayload *beacon.ExecutableDataV1
var buildErr error
for {
select {
case <-shutdown:
log.Info("shutting down")
return nil
case <-ctx.Done():
log.Info("context closed", "err", ctx.Err())
return ctx.Err()
case now := <-ticker.C:
blockTime := time.Duration(settings.BlockTime) * time.Second
lastTime := uint64(0)
if lastPayload != nil {
lastTime = lastPayload.Timestamp
}
buildTriggerTime := time.Unix(int64(lastTime), 0).Add(blockTime - settings.BuildTime)

if lastPayload == nil || now.After(buildTriggerTime) {
buildTime := settings.BuildTime
// don't waste time on trying to include txs if we are lagging behind at least a block,
// but don't go ham if we are failing to build blocks already.
if buildErr == nil && now.After(buildTriggerTime.Add(blockTime)) {
buildTime = 10 * time.Millisecond
}
buildErr = nil
status, err := Status(ctx, client)
if err != nil {
log.Error("failed to get pre-block engine status", "err", err)
metrics.RecordBlockFail()
buildErr = err
continue
}
log.Info("status", "head", status.Head, "safe", status.Safe, "finalized", status.Finalized,
"head_time", status.Head.Time, "txs", status.Txs, "gas", status.Gas, "basefee", status.Gas)

// On a mocked "beacon epoch transition", update finalization and justification checkpoints.
// There are no gap slots, so we just go back 32 blocks.
if status.Head.Number%32 == 0 {
if status.Safe.Number+32 <= status.Head.Number {
safe, err := getHeader(ctx, client, "eth_getBlockByNumber", hexutil.Uint64(status.Head.Number-32).String())
if err != nil {
buildErr = err
log.Error("failed to find block for new safe block progress", "err", err)
continue
}
status.Safe = eth.L1BlockRef{Hash: safe.Hash(), Number: safe.Number.Uint64(), Time: safe.Time, ParentHash: safe.ParentHash}
}
if status.Finalized.Number+32 <= status.Safe.Number {
finalized, err := getHeader(ctx, client, "eth_getBlockByNumber", hexutil.Uint64(status.Safe.Number-32).String())
if err != nil {
buildErr = err
log.Error("failed to find block for new finalized block progress", "err", err)
continue
}
status.Finalized = eth.L1BlockRef{Hash: finalized.Hash(), Number: finalized.Number.Uint64(), Time: finalized.Time, ParentHash: finalized.ParentHash}
}
}

payload, err := BuildBlock(ctx, client, status, &BlockBuildingSettings{
BlockTime: settings.BlockTime,
Random: settings.Random,
FeeRecipient: settings.FeeRecipient,
BuildTime: buildTime,
})
if err != nil {
buildErr = err
log.Error("failed to produce block", "err", err)
metrics.RecordBlockFail()
} else {
lastPayload = payload
log.Info("created block", "hash", payload.BlockHash, "number", payload.Number,
"timestamp", payload.Timestamp, "txs", len(payload.Transactions),
"gas", payload.GasUsed, "basefee", payload.BaseFeePerGas)
basefee, _ := new(big.Float).SetInt(payload.BaseFeePerGas).Float64()
metrics.RecordBlockStats(payload.BlockHash, payload.Number, payload.Timestamp, uint64(len(payload.Transactions)), payload.GasUsed, basefee)
}
}
}
}
}

type StatusData struct {
Head eth.L1BlockRef `json:"head"`
Safe eth.L1BlockRef `json:"safe"`
Finalized eth.L1BlockRef `json:"finalized"`
Txs uint64 `json:"txs"`
Gas uint64 `json:"gas"`
StateRoot common.Hash `json:"stateRoot"`
BaseFee *big.Int `json:"baseFee"`
}

func Status(ctx context.Context, client client.RPC) (*StatusData, error) {
head, safe, finalized, err := headSafeFinalized(ctx, client)
if err != nil {
return nil, err
}
return &StatusData{
Head: eth.L1BlockRef{Hash: head.Hash(), Number: head.NumberU64(), Time: head.Time(), ParentHash: head.ParentHash()},
Safe: eth.L1BlockRef{Hash: safe.Hash(), Number: safe.Number.Uint64(), Time: safe.Time, ParentHash: safe.ParentHash},
Finalized: eth.L1BlockRef{Hash: finalized.Hash(), Number: finalized.Number.Uint64(), Time: finalized.Time, ParentHash: finalized.ParentHash},
Txs: uint64(len(head.Transactions())),
Gas: head.GasUsed(),
StateRoot: head.Root(),
BaseFee: head.BaseFee(),
}, nil
}

// Copy takes the forkchoice state of copyFrom, and applies it to copyTo, and inserts the head-block.
// The destination engine should then start syncing to this new chain if it has peers to do so.
func Copy(ctx context.Context, copyFrom client.RPC, copyTo client.RPC) error {
copyHead, copySafe, copyFinalized, err := headSafeFinalized(ctx, copyFrom)
if err != nil {
return err
}
payload := beacon.BlockToExecutableData(copyHead)
if err := updateForkchoice(ctx, copyTo, copyHead.ParentHash(), copySafe.Hash(), copyFinalized.Hash()); err != nil {
return err
}
if err := insertBlock(ctx, copyTo, payload); err != nil {
return err
}
if err := updateForkchoice(ctx, copyTo, payload.BlockHash, copySafe.Hash(), copyFinalized.Hash()); err != nil {
return err
}
return nil
}
87 changes: 87 additions & 0 deletions op-wheel/engine/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package engine

import (
"encoding/binary"

"github.com/ethereum/go-ethereum/common"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

var Namespace = "op_node"

type Metricer interface {
RecordBlockFail()
RecordBlockStats(hash common.Hash, num uint64, time uint64, txs uint64, gas uint64, baseFee float64)
}

type Metrics struct {
BlockFails prometheus.Counter

BlockHash prometheus.Gauge
BlockNum prometheus.Gauge
BlockTime prometheus.Gauge
BlockTxs prometheus.Gauge
BlockGas prometheus.Gauge
BlockBaseFee prometheus.Gauge
}

func NewMetrics(procName string, registry *prometheus.Registry) *Metrics {
if procName == "" {
procName = "default"
}
ns := Namespace + "_" + procName
return &Metrics{
BlockFails: promauto.With(registry).NewCounter(prometheus.CounterOpts{
Namespace: ns,
Subsystem: "engine",
Name: "block_fails",
Help: "Total block building attempts that fail",
}),
BlockHash: promauto.With(registry).NewGauge(prometheus.GaugeOpts{
Namespace: ns,
Name: "block_hash",
Help: "current head block hash",
}),
BlockNum: promauto.With(registry).NewGauge(prometheus.GaugeOpts{
Namespace: ns,
Name: "block_num",
Help: "current head block number",
}),
BlockTime: promauto.With(registry).NewGauge(prometheus.GaugeOpts{
Namespace: ns,
Name: "block_time",
Help: "current head block time",
}),
BlockTxs: promauto.With(registry).NewGauge(prometheus.GaugeOpts{
Namespace: ns,
Name: "block_txs",
Help: "current head block txs",
}),
BlockGas: promauto.With(registry).NewGauge(prometheus.GaugeOpts{
Namespace: ns,
Name: "block_gas",
Help: "current head block gas",
}),
BlockBaseFee: promauto.With(registry).NewGauge(prometheus.GaugeOpts{
Namespace: ns,
Name: "block_base_fee",
Help: "current head block basefee",
}),
}
}

func (r *Metrics) RecordBlockFail() {
r.BlockFails.Inc()
}

func (r *Metrics) RecordBlockStats(hash common.Hash, num uint64, time uint64, txs uint64, gas uint64, baseFee float64) {
r.BlockHash.Set(float64(binary.LittleEndian.Uint64(hash[:]))) // for pretty block-color changing charts
r.BlockNum.Set(float64(num))
r.BlockTime.Set(float64(time))
r.BlockTxs.Set(float64(txs))
r.BlockGas.Set(float64(gas))
r.BlockGas.Set(float64(baseFee))
}

var _ Metricer = (*Metrics)(nil)

0 comments on commit ea8c462

Please sign in to comment.