Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement validator payload retrieval #1194

Merged
merged 23 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 66 additions & 3 deletions api/clients/v2/config.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package clients

import (
"fmt"
"errors"
"time"

"github.com/Layr-Labs/eigenda/api/clients/codecs"
Expand Down Expand Up @@ -54,6 +54,24 @@ type RelayPayloadRetrieverConfig struct {
RelayTimeout time.Duration
}

// DistributedPayloadRetrieverConfig contains an embedded PayloadClientConfig, plus all additional configuration values
// needed by a DistributedPayloadRetriever
type DistributedPayloadRetrieverConfig struct {
PayloadClientConfig

// The timeout duration for calls to retrieve blobs from a given quorum.
RetrievalTimeout time.Duration
samlaf marked this conversation as resolved.
Show resolved Hide resolved

// The address of the BlsOperatorStateRetriever contract
BlsOperatorStateRetrieverAddr string
Comment on lines +73 to +74
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there no way to derive this address using the service manager as a source of truth?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. Looking at reader.go, BlsOperatorStateRetrieverAddr and eigenDAServiceManagerHexAddr are both required arguments. Though I'm not sure why that's the case


// The address of the EigenDAServiceManager contract
EigenDAServiceManagerAddr string

// The number of simultaneous connections to use when fetching chunks during distributed retrieval
ConnectionCount uint
}

// PayloadDisperserConfig contains an embedded PayloadClientConfig, plus all additional configuration values needed
// by a PayloadDisperser
type PayloadDisperserConfig struct {
Expand Down Expand Up @@ -105,11 +123,11 @@ func (cc *PayloadClientConfig) checkAndSetDefaults() error {
// BlobEncodingVersion may be 0, so don't do anything

if cc.EthRpcUrl == "" {
return fmt.Errorf("EthRpcUrl is required")
return errors.New("EthRpcUrl is required")
}

if cc.EigenDACertVerifierAddr == "" {
return fmt.Errorf("EigenDACertVerifierAddr is required")
return errors.New("EigenDACertVerifierAddr is required")
}

// Nothing to do for PayloadPolynomialForm
Expand Down Expand Up @@ -154,6 +172,51 @@ func (rc *RelayPayloadRetrieverConfig) checkAndSetDefaults() error {
return nil
}

// GetDefaultDistributedPayloadRetrieverConfig creates a DistributedPayloadRetrieverConfig with default values
//
// NOTE: The following fields do not have defined defaults and must always be specifically configured:
// - EthRpcUrl
// - EigenDACertVerifierAddr
// - BlsOperatorStateRetrieverAddr
// - EigenDAServiceManagerAddr
func GetDefaultDistributedPayloadRetrieverConfig() *DistributedPayloadRetrieverConfig {
return &DistributedPayloadRetrieverConfig{
PayloadClientConfig: *getDefaultPayloadClientConfig(),
RetrievalTimeout: 20 * time.Second,
ConnectionCount: 10,
}
}

// checkAndSetDefaults checks an existing config struct and performs the following actions:
//
// 1. If a config value is 0, and a 0 value makes sense, do nothing.
cody-littley marked this conversation as resolved.
Show resolved Hide resolved
// 2. If a config value is 0, but a 0 value doesn't make sense and a default value is defined, then set it to the default.
// 3. If a config value is 0, but a 0 value doesn't make sense and a default value isn't defined, return an error.
func (rc *DistributedPayloadRetrieverConfig) checkAndSetDefaults() error {
err := rc.PayloadClientConfig.checkAndSetDefaults()
if err != nil {
return err
}

if rc.BlsOperatorStateRetrieverAddr == "" {
return errors.New("BlsOperatorStateRetrieverAddr is required")
}

if rc.EigenDAServiceManagerAddr == "" {
return errors.New("EigenDAServiceManagerAddr is required")
}

defaultConfig := GetDefaultDistributedPayloadRetrieverConfig()
if rc.RetrievalTimeout == 0 {
rc.RetrievalTimeout = defaultConfig.RetrievalTimeout
}
if rc.ConnectionCount == 0 {
rc.ConnectionCount = defaultConfig.ConnectionCount
}

return nil
}

// GetDefaultPayloadDisperserConfig creates a PayloadDisperserConfig with default values
//
// NOTE: EthRpcUrl and EigenDACertVerifierAddr do not have defined defaults. These must always be specifically configured.
Expand Down
210 changes: 210 additions & 0 deletions api/clients/v2/distributed_payload_retriever.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
package clients

import (
"context"
"fmt"

"github.com/Layr-Labs/eigenda/api/clients/codecs"
"github.com/Layr-Labs/eigenda/api/clients/v2/verification"
"github.com/Layr-Labs/eigenda/common/geth"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/core/eth"
"github.com/Layr-Labs/eigenda/core/thegraph"
corev2 "github.com/Layr-Labs/eigenda/core/v2"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/Layr-Labs/eigenda/encoding/kzg"
"github.com/Layr-Labs/eigenda/encoding/kzg/verifier"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/consensys/gnark-crypto/ecc/bn254"
gethcommon "github.com/ethereum/go-ethereum/common"
)

// DistributedPayloadRetriever provides the ability to get payloads from the EigenDA nodes directly
//
// This struct is goroutine safe.
type DistributedPayloadRetriever struct {
logger logging.Logger
config DistributedPayloadRetrieverConfig
codec codecs.BlobCodec
retrievalClient RetrievalClient
g1Srs []bn254.G1Affine
}

var _ PayloadRetriever = &DistributedPayloadRetriever{}

// BuildDistributedPayloadRetriever builds a DistributedPayloadRetriever from config structs.
func BuildDistributedPayloadRetriever(
cody-littley marked this conversation as resolved.
Show resolved Hide resolved
logger logging.Logger,
distributedPayloadRetrieverConfig DistributedPayloadRetrieverConfig,
ethConfig geth.EthClientConfig,
thegraphConfig thegraph.Config,
Copy link
Contributor

@bxue-l2 bxue-l2 Jan 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we no longer require graph node for retrieving the DA node sockets. Instead, we can call a contract, https://github.com/Layr-Labs/eigenlayer-middleware/blob/fe5834371caed60c1d26ab62b5519b0cbdcb42fa/src/SocketRegistry.sol#L17, and use the mapping there to get the IP address, @0x0aa0 is that right?

mainnet: 0x5a3eD432f2De9645940333e4474bBAAB8cf64cf2

preprod: 0x1747ef24dbbb52cB06382d323f455D48dE1AC7fd

testnet: 0x25aFC8944f501545DDB7E7C4C8A0119965AAb166

But that requires changing this part, https://github.com/Layr-Labs/eigenda/blob/master/api/clients/retrieval_client.go#L122

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will be able to retrieve onchain once all the operators have migrated over and set that in storage but this will not be instant so we will probably require the graph for a time while that migration takes place

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@0x0aa0 in V2, does operator inherit registration in V1, or have to register independently in V2? then it looks like migration is not a constraint for V2. But it forces us to support two version for retrieval.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@0x0aa0 Theoretically we should be able to make this change right away for this code, since operators will have to make the required updates to be able to use V2 anyway?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created this issue to address in the near future

kzgConfig kzg.KzgConfig,
) (*DistributedPayloadRetriever, error) {
err := distributedPayloadRetrieverConfig.checkAndSetDefaults()
if err != nil {
return nil, fmt.Errorf("check and set config defaults: %w", err)
}

ethClient, err := geth.NewClient(ethConfig, gethcommon.Address{}, 0, logger)
if err != nil {
return nil, fmt.Errorf("new eth client: %w", err)
}

reader, err := eth.NewReader(
logger,
ethClient,
distributedPayloadRetrieverConfig.BlsOperatorStateRetrieverAddr,
distributedPayloadRetrieverConfig.EigenDAServiceManagerAddr)
if err != nil {
return nil, fmt.Errorf("new reader: %w", err)
}

chainState := eth.NewChainState(reader, ethClient)
indexedChainState := thegraph.MakeIndexedChainState(thegraphConfig, chainState, logger)

kzgVerifier, err := verifier.NewVerifier(&kzgConfig, nil)
if err != nil {
return nil, fmt.Errorf("new verifier: %w", err)
}

retrievalClient := NewRetrievalClient(
logger,
reader,
indexedChainState,
kzgVerifier,
int(distributedPayloadRetrieverConfig.ConnectionCount))

codec, err := codecs.CreateCodec(
distributedPayloadRetrieverConfig.PayloadPolynomialForm,
distributedPayloadRetrieverConfig.BlobEncodingVersion)
if err != nil {
return nil, fmt.Errorf("create codec: %w", err)
}

return &DistributedPayloadRetriever{
logger: logger,
config: distributedPayloadRetrieverConfig,
codec: codec,
retrievalClient: retrievalClient,
g1Srs: kzgVerifier.Srs.G1,
}, nil
}

// NewDistributedPayloadRetriever creates a new DistributedPayloadRetriever from already constructed objects
func NewDistributedPayloadRetriever(
logger logging.Logger,
config DistributedPayloadRetrieverConfig,
codec codecs.BlobCodec,
retrievalClient RetrievalClient,
g1Srs []bn254.G1Affine,
) (*DistributedPayloadRetriever, error) {
err := config.checkAndSetDefaults()
if err != nil {
return nil, fmt.Errorf("check and set config defaults: %w", err)
}

return &DistributedPayloadRetriever{
logger: logger,
config: config,
codec: codec,
retrievalClient: retrievalClient,
g1Srs: g1Srs,
}, nil
}

// GetPayload iteratively attempts to retrieve a given blob from the quorums listed in the EigenDACert.
//
// If the blob is successfully retrieved, then the blob verified against the EigenDACert. If the verification succeeds,
// the blob is decoded to yield the payload (the original user data, with no padding or any modification), and the
// payload is returned.
func (pr *DistributedPayloadRetriever) GetPayload(
ctx context.Context,
eigenDACert *verification.EigenDACert,
) ([]byte, error) {

blobKey, err := eigenDACert.ComputeBlobKey()
if err != nil {
return nil, fmt.Errorf("compute blob key: %w", err)
}

blobHeader := eigenDACert.BlobInclusionInfo.BlobCertificate.BlobHeader
commitment, err := verification.BlobCommitmentsBindingToInternal(&blobHeader.Commitment)
if err != nil {
return nil, fmt.Errorf("convert commitments binding to internal: %w", err)
}

// TODO (litt3): Add a feature which keeps chunks from previous quorums, and just fills in gaps
for _, quorumID := range blobHeader.QuorumNumbers {
blobBytes, err := pr.getBlobWithTimeout(
ctx,
*blobKey,
blobHeader.Version,
*commitment,
eigenDACert.BatchHeader.ReferenceBlockNumber,
quorumID)

if err != nil {
pr.logger.Warn(
"blob couldn't be retrieved from quorum",
"blobKey", blobKey.Hex(),
"quorumId", quorumID,
"error", err)
continue
}

err = verification.CheckBlobLength(blobBytes, commitment.Length)
if err != nil {
pr.logger.Warn("check blob length", "blobKey", blobKey.Hex(), "quorumID", quorumID, "error", err)
continue
}

valid, err := verification.GenerateAndCompareBlobCommitment(pr.g1Srs, blobBytes, commitment.Commitment)
if err != nil {
pr.logger.Warn(
"generate and compare blob commitment",
"blobKey", blobKey.Hex(), "quorumID", quorumID, "error", err)
continue
}
if !valid {
pr.logger.Warn("cert is invalid", "blobKey", blobKey.Hex(), "quorumID", quorumID)
bxue-l2 marked this conversation as resolved.
Show resolved Hide resolved
continue
}

payload, err := pr.codec.DecodeBlob(blobBytes)
if err != nil {
pr.logger.Error(
`Cert verification was successful, but decode blob failed!
This is likely a problem with the local blob codec configuration,
but could potentially indicate a maliciously generated blob certificate.
It should not be possible for an honestly generated certificate to verify
for an invalid blob!`,
"blobKey", blobKey.Hex(), "quorumID", quorumID, "eigenDACert", eigenDACert, "error", err)
return nil, fmt.Errorf("decode blob: %w", err)
}

return payload, nil
}

return nil, fmt.Errorf("unable to retrieve payload from quorums %v", blobHeader.QuorumNumbers)
}

// getBlobWithTimeout attempts to get a blob from a given quorum, and times out based on config.RetrievalTimeout
func (pr *DistributedPayloadRetriever) getBlobWithTimeout(
ctx context.Context,
blobKey corev2.BlobKey,
blobVersion corev2.BlobVersion,
blobCommitments encoding.BlobCommitments,
referenceBlockNumber uint32,
quorumID core.QuorumID) ([]byte, error) {

timeoutCtx, cancel := context.WithTimeout(ctx, pr.config.RetrievalTimeout)
defer cancel()

return pr.retrievalClient.GetBlob(
timeoutCtx,
blobKey,
blobVersion,
blobCommitments,
uint64(referenceBlockNumber),
quorumID)
}
54 changes: 44 additions & 10 deletions api/clients/v2/mock/retrieval_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading