Skip to content

Commit

Permalink
op-node,op-service: Add Fallback Beacon Client (ethereum-optimism#9458)
Browse files Browse the repository at this point in the history
* op-node,op-service: Add Fallback Beacon Client

This splits out a very thin wrapper around the Beacon API into the
BeaconClient interface. This interface is then implement by two different
structs. The first is a simple wrapper around the HTTP Client. The second
is a FallBack Client which tries two different underlying clients.

This also a beacon archiver to be used as a fallback while still pulling most
of the relevant data from the beacon node.

* Try all clients in the pool
  • Loading branch information
trianglesphere authored Feb 13, 2024
1 parent 061f0b8 commit 39bc6f4
Show file tree
Hide file tree
Showing 10 changed files with 597 additions and 37 deletions.
2 changes: 1 addition & 1 deletion op-e2e/l1_beacon_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestGetVersion(t *testing.T) {
require.NoError(t, beaconApi.Start("127.0.0.1:0"))

beaconCfg := sources.L1BeaconClientConfig{FetchAllSidecars: false}
cl := sources.NewL1BeaconClient(client.NewBasicHTTPClient(beaconApi.BeaconAddr(), l), beaconCfg)
cl := sources.NewL1BeaconClient(sources.NewBeaconHTTPClient(client.NewBasicHTTPClient(beaconApi.BeaconAddr(), l)), beaconCfg)

version, err := cl.GetVersion(context.Background())
require.NoError(t, err)
Expand Down
7 changes: 7 additions & 0 deletions op-node/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ var (
Required: false,
EnvVars: prefixEnvVars("L1_BEACON"),
}
BeaconArchiverAddr = &cli.StringFlag{
Name: "l1.beacon-archiver",
Usage: "Address of L1 Beacon-node compatible HTTP endpoint to use. This is used to fetch blobs that the --l1.beacon does not have (i.e expired blobs).",
Required: false,
EnvVars: prefixEnvVars("L1_BEACON_ARCHIVER"),
}
BeaconCheckIgnore = &cli.BoolFlag{
Name: "l1.beacon.ignore",
Usage: "When false, halts op-node startup if the healthcheck to the Beacon-node endpoint fails.",
Expand Down Expand Up @@ -292,6 +298,7 @@ var requiredFlags = []cli.Flag{

var optionalFlags = []cli.Flag{
BeaconAddr,
BeaconArchiverAddr,
BeaconCheckIgnore,
BeaconFetchAllSidecars,
SyncModeFlag,
Expand Down
12 changes: 9 additions & 3 deletions op-node/node/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type L1EndpointSetup interface {
}

type L1BeaconEndpointSetup interface {
Setup(ctx context.Context, log log.Logger) (cl client.HTTP, err error)
Setup(ctx context.Context, log log.Logger) (cl sources.BeaconClient, fb []sources.BlobSideCarsFetcher, err error)
// ShouldIgnoreBeaconCheck returns true if the Beacon-node version check should not halt startup.
ShouldIgnoreBeaconCheck() bool
ShouldFetchAllSidecars() bool
Expand Down Expand Up @@ -177,14 +177,20 @@ func (cfg *PreparedL1Endpoint) Check() error {

type L1BeaconEndpointConfig struct {
BeaconAddr string // Address of L1 User Beacon-API endpoint to use (beacon namespace required)
BeaconArchiverAddr string // Address of L1 User Beacon-API Archive endpoint to use for expired blobs (beacon namespace required)
BeaconCheckIgnore bool // When false, halt startup if the beacon version endpoint fails
BeaconFetchAllSidecars bool // Whether to fetch all blob sidecars and filter locally
}

var _ L1BeaconEndpointSetup = (*L1BeaconEndpointConfig)(nil)

func (cfg *L1BeaconEndpointConfig) Setup(ctx context.Context, log log.Logger) (cl client.HTTP, err error) {
return client.NewBasicHTTPClient(cfg.BeaconAddr, log), nil
func (cfg *L1BeaconEndpointConfig) Setup(ctx context.Context, log log.Logger) (cl sources.BeaconClient, fb []sources.BlobSideCarsFetcher, err error) {
a := client.NewBasicHTTPClient(cfg.BeaconAddr, log)
if cfg.BeaconArchiverAddr != "" {
b := client.NewBasicHTTPClient(cfg.BeaconArchiverAddr, log)
fb = append(fb, sources.NewBeaconHTTPClient(b))
}
return sources.NewBeaconHTTPClient(a), fb, nil
}

func (cfg *L1BeaconEndpointConfig) Check() error {
Expand Down
4 changes: 2 additions & 2 deletions op-node/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,14 +303,14 @@ func (n *OpNode) initL1BeaconAPI(ctx context.Context, cfg *Config) error {

// We always initialize a client. We will get an error on requests if the client does not work.
// This way the op-node can continue non-L1 functionality when the user chooses to ignore the Beacon API requirement.
httpClient, err := cfg.Beacon.Setup(ctx, n.log)
beaconClient, fallbacks, err := cfg.Beacon.Setup(ctx, n.log)
if err != nil {
return fmt.Errorf("failed to setup L1 Beacon API client: %w", err)
}
beaconCfg := sources.L1BeaconClientConfig{
FetchAllSidecars: cfg.Beacon.ShouldFetchAllSidecars(),
}
n.beacon = sources.NewL1BeaconClient(httpClient, beaconCfg)
n.beacon = sources.NewL1BeaconClient(beaconClient, beaconCfg, fallbacks...)

// Retry retrieval of the Beacon API version, to be more robust on startup against Beacon API connection issues.
beaconVersion, missingEndpoint, err := retry.Do2[string, bool](ctx, 5, retry.Exponential(), func() (string, bool, error) {
Expand Down
1 change: 1 addition & 0 deletions op-node/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) {
func NewBeaconEndpointConfig(ctx *cli.Context) node.L1BeaconEndpointSetup {
return &node.L1BeaconEndpointConfig{
BeaconAddr: ctx.String(flags.BeaconAddr.Name),
BeaconArchiverAddr: ctx.String(flags.BeaconArchiverAddr.Name),
BeaconCheckIgnore: ctx.Bool(flags.BeaconCheckIgnore.Name),
BeaconFetchAllSidecars: ctx.Bool(flags.BeaconFetchAllSidecars.Name),
}
Expand Down
2 changes: 1 addition & 1 deletion op-program/host/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func makePrefetcher(ctx context.Context, logger log.Logger, kv kvstore.KV, cfg *
if err != nil {
return nil, fmt.Errorf("failed to create L1 client: %w", err)
}
l1Beacon := client.NewBasicHTTPClient(cfg.L1BeaconURL, logger)
l1Beacon := sources.NewBeaconHTTPClient(client.NewBasicHTTPClient(cfg.L1BeaconURL, logger))
l1BlobFetcher := sources.NewL1BeaconClient(l1Beacon, sources.L1BeaconClientConfig{FetchAllSidecars: false})
l2Cl, err := NewL2Client(l2RPC, logger, nil, &L2ClientConfig{L2ClientConfig: l2ClCfg, L2Head: cfg.L2Head})
if err != nil {
Expand Down
164 changes: 134 additions & 30 deletions op-service/sources/l1_beacon_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sources
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
Expand All @@ -19,29 +20,52 @@ import (

const (
versionMethod = "eth/v1/node/version"
genesisMethod = "eth/v1/beacon/genesis"
specMethod = "eth/v1/config/spec"
genesisMethod = "eth/v1/beacon/genesis"
sidecarsMethodPrefix = "eth/v1/beacon/blob_sidecars/"
)

type L1BeaconClientConfig struct {
FetchAllSidecars bool
}

// L1BeaconClient is a high level golang client for the Beacon API.
type L1BeaconClient struct {
cl client.HTTP
cfg L1BeaconClientConfig
cl BeaconClient
pool *ClientPool[BlobSideCarsFetcher]
cfg L1BeaconClientConfig

initLock sync.Mutex
timeToSlotFn TimeToSlotFn
}

// NewL1BeaconClient returns a client for making requests to an L1 consensus layer node.
func NewL1BeaconClient(cl client.HTTP, cfg L1BeaconClientConfig) *L1BeaconClient {
return &L1BeaconClient{cl: cl, cfg: cfg}
// BeaconClient is a thin wrapper over the Beacon APIs.
//
//go:generate mockery --name BeaconClient --with-expecter=true
type BeaconClient interface {
NodeVersion(ctx context.Context) (string, error)
ConfigSpec(ctx context.Context) (eth.APIConfigResponse, error)
BeaconGenesis(ctx context.Context) (eth.APIGenesisResponse, error)
BeaconBlobSideCars(ctx context.Context, fetchAllSidecars bool, slot uint64, hashes []eth.IndexedBlobHash) (eth.APIGetBlobSidecarsResponse, error)
}

// BlobSideCarsFetcher is a thin wrapper over the Beacon APIs.
//
//go:generate mockery --name BlobSideCarsFetcher --with-expecter=true
type BlobSideCarsFetcher interface {
BeaconBlobSideCars(ctx context.Context, fetchAllSidecars bool, slot uint64, hashes []eth.IndexedBlobHash) (eth.APIGetBlobSidecarsResponse, error)
}

func (cl *L1BeaconClient) apiReq(ctx context.Context, dest any, reqPath string, reqQuery url.Values) error {
// BeaconHTTPClient implements BeaconClient. It provides golang types over the basic Beacon API.
type BeaconHTTPClient struct {
cl client.HTTP
}

func NewBeaconHTTPClient(cl client.HTTP) *BeaconHTTPClient {
return &BeaconHTTPClient{cl}
}

func (cl *BeaconHTTPClient) apiReq(ctx context.Context, dest any, reqPath string, reqQuery url.Values) error {
headers := http.Header{}
headers.Add("Accept", "application/json")
resp, err := cl.cl.Get(ctx, reqPath, reqQuery, headers)
Expand All @@ -63,6 +87,84 @@ func (cl *L1BeaconClient) apiReq(ctx context.Context, dest any, reqPath string,
return nil
}

func (cl *BeaconHTTPClient) NodeVersion(ctx context.Context) (string, error) {
var resp eth.APIVersionResponse
if err := cl.apiReq(ctx, &resp, versionMethod, nil); err != nil {
return "", err
}
return resp.Data.Version, nil
}

func (cl *BeaconHTTPClient) ConfigSpec(ctx context.Context) (eth.APIConfigResponse, error) {
var configResp eth.APIConfigResponse
if err := cl.apiReq(ctx, &configResp, specMethod, nil); err != nil {
return eth.APIConfigResponse{}, err
}
return configResp, nil
}

func (cl *BeaconHTTPClient) BeaconGenesis(ctx context.Context) (eth.APIGenesisResponse, error) {
var genesisResp eth.APIGenesisResponse
if err := cl.apiReq(ctx, &genesisResp, genesisMethod, nil); err != nil {
return eth.APIGenesisResponse{}, err
}
return genesisResp, nil
}

func (cl *BeaconHTTPClient) BeaconBlobSideCars(ctx context.Context, fetchAllSidecars bool, slot uint64, hashes []eth.IndexedBlobHash) (eth.APIGetBlobSidecarsResponse, error) {
reqPath := path.Join(sidecarsMethodPrefix, strconv.FormatUint(slot, 10))
var reqQuery url.Values
if !fetchAllSidecars {
reqQuery = url.Values{}
for i := range hashes {
reqQuery.Add("indices", strconv.FormatUint(hashes[i].Index, 10))
}
}
var resp eth.APIGetBlobSidecarsResponse
if err := cl.apiReq(ctx, &resp, reqPath, reqQuery); err != nil {
return eth.APIGetBlobSidecarsResponse{}, err
}
return resp, nil
}

type ClientPool[T any] struct {
clients []T
index int
}

func NewClientPool[T any](clients ...T) *ClientPool[T] {
return &ClientPool[T]{
clients: clients,
index: 0,
}
}

func (p *ClientPool[T]) Len() int {
return len(p.clients)
}

func (p *ClientPool[T]) Get() T {
return p.clients[p.index]
}

func (p *ClientPool[T]) MoveToNext() {
p.index += 1
if p.index == len(p.clients) {
p.index = 0
}
}

// NewL1BeaconClient returns a client for making requests to an L1 consensus layer node.
// Fallbacks are optional clients that will be used for fetching blobs. L1BeaconClient will rotate between
// the `cl` and the fallbacks whenever a client runs into an error while fetching blobs.
func NewL1BeaconClient(cl BeaconClient, cfg L1BeaconClientConfig, fallbacks ...BlobSideCarsFetcher) *L1BeaconClient {
cs := append([]BlobSideCarsFetcher{cl}, fallbacks...)
return &L1BeaconClient{
cl: cl,
pool: NewClientPool[BlobSideCarsFetcher](cs...),
cfg: cfg}
}

type TimeToSlotFn func(timestamp uint64) (uint64, error)

// GetTimeToSlotFn returns a function that converts a timestamp to a slot number.
Expand All @@ -73,20 +175,20 @@ func (cl *L1BeaconClient) GetTimeToSlotFn(ctx context.Context) (TimeToSlotFn, er
return cl.timeToSlotFn, nil
}

var genesisResp eth.APIGenesisResponse
if err := cl.apiReq(ctx, &genesisResp, genesisMethod, nil); err != nil {
genesis, err := cl.cl.BeaconGenesis(ctx)
if err != nil {
return nil, err
}

var configResp eth.APIConfigResponse
if err := cl.apiReq(ctx, &configResp, specMethod, nil); err != nil {
config, err := cl.cl.ConfigSpec(ctx)
if err != nil {
return nil, err
}

genesisTime := uint64(genesisResp.Data.GenesisTime)
secondsPerSlot := uint64(configResp.Data.SecondsPerSlot)
genesisTime := uint64(genesis.Data.GenesisTime)
secondsPerSlot := uint64(config.Data.SecondsPerSlot)
if secondsPerSlot == 0 {
return nil, fmt.Errorf("got bad value for seconds per slot: %v", configResp.Data.SecondsPerSlot)
return nil, fmt.Errorf("got bad value for seconds per slot: %v", config.Data.SecondsPerSlot)
}
cl.timeToSlotFn = func(timestamp uint64) (uint64, error) {
if timestamp < genesisTime {
Expand All @@ -97,6 +199,21 @@ func (cl *L1BeaconClient) GetTimeToSlotFn(ctx context.Context) (TimeToSlotFn, er
return cl.timeToSlotFn, nil
}

func (cl *L1BeaconClient) fetchSidecars(ctx context.Context, slot uint64, hashes []eth.IndexedBlobHash) (eth.APIGetBlobSidecarsResponse, error) {
var errs []error
for i := 0; i < cl.pool.Len(); i++ {
f := cl.pool.Get()
resp, err := f.BeaconBlobSideCars(ctx, cl.cfg.FetchAllSidecars, slot, hashes)
if err != nil {
cl.pool.MoveToNext()
errs = append(errs, err)
} else {
return resp, nil
}
}
return eth.APIGetBlobSidecarsResponse{}, errors.Join(errs...)
}

// GetBlobSidecars fetches blob sidecars that were confirmed in the specified
// L1 block with the given indexed hashes.
// Order of the returned sidecars is guaranteed to be that of the hashes.
Expand All @@ -114,17 +231,8 @@ func (cl *L1BeaconClient) GetBlobSidecars(ctx context.Context, ref eth.L1BlockRe
return nil, fmt.Errorf("error in converting ref.Time to slot: %w", err)
}

reqPath := path.Join(sidecarsMethodPrefix, strconv.FormatUint(slot, 10))
var reqQuery url.Values
if !cl.cfg.FetchAllSidecars {
reqQuery = url.Values{}
for i := range hashes {
reqQuery.Add("indices", strconv.FormatUint(hashes[i].Index, 10))
}
}

var resp eth.APIGetBlobSidecarsResponse
if err := cl.apiReq(ctx, &resp, reqPath, reqQuery); err != nil {
resp, err := cl.fetchSidecars(ctx, slot, hashes)
if err != nil {
return nil, fmt.Errorf("failed to fetch blob sidecars for slot %v block %v: %w", slot, ref, err)
}

Expand Down Expand Up @@ -192,9 +300,5 @@ func blobsFromSidecars(blobSidecars []*eth.BlobSidecar, hashes []eth.IndexedBlob

// GetVersion fetches the version of the Beacon-node.
func (cl *L1BeaconClient) GetVersion(ctx context.Context) (string, error) {
var resp eth.APIVersionResponse
if err := cl.apiReq(ctx, &resp, versionMethod, nil); err != nil {
return "", err
}
return resp.Data.Version, nil
return cl.cl.NodeVersion(ctx)
}
Loading

0 comments on commit 39bc6f4

Please sign in to comment.