Skip to content

Commit

Permalink
Draft distributed payload retrieval
Browse files Browse the repository at this point in the history
Signed-off-by: litt3 <[email protected]>
  • Loading branch information
litt3 committed Jan 31, 2025
1 parent 7ce439f commit 1026ea2
Show file tree
Hide file tree
Showing 16 changed files with 350 additions and 146 deletions.
26 changes: 13 additions & 13 deletions api/clients/v2/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,13 @@ type PayloadClientConfig struct {
BlobVersion v2.BlobVersion
}

// RelayPayloadRetrieverConfig contains an embedded PayloadClientConfig, plus all additional configuration values needed
// by a RelayPayloadRetriever
type RelayPayloadRetrieverConfig struct {
// PayloadRetrieverConfig contains an embedded PayloadClientConfig, plus all additional configuration values needed
// by a PayloadRetriever
type PayloadRetrieverConfig struct {
PayloadClientConfig

// The timeout duration for relay calls to retrieve blobs.
RelayTimeout time.Duration
// The timeout duration for calls to retrieve blobs.
FetchTimeout time.Duration
}

// PayloadDisperserConfig contains an embedded PayloadClientConfig, plus all additional configuration values needed
Expand Down Expand Up @@ -125,13 +125,13 @@ func (cc *PayloadClientConfig) checkAndSetDefaults() error {
return nil
}

// GetDefaultRelayPayloadRetrieverConfig creates a RelayPayloadRetrieverConfig with default values
// GetDefaultPayloadRetrieverConfig creates a GetDefaultPayloadRetrieverConfig with default values
//
// NOTE: EthRpcUrl and EigenDACertVerifierAddr do not have defined defaults. These must always be specifically configured.
func GetDefaultRelayPayloadRetrieverConfig() *RelayPayloadRetrieverConfig {
return &RelayPayloadRetrieverConfig{
func GetDefaultPayloadRetrieverConfig() *PayloadRetrieverConfig {
return &PayloadRetrieverConfig{
PayloadClientConfig: *getDefaultPayloadClientConfig(),
RelayTimeout: 5 * time.Second,
FetchTimeout: 5 * time.Second,
}
}

Expand All @@ -140,15 +140,15 @@ func GetDefaultRelayPayloadRetrieverConfig() *RelayPayloadRetrieverConfig {
// 1. If a config value is 0, and a 0 value makes sense, do nothing.
// 2. If a config value is 0, but a 0 value doesn't make sense and a default value is defined, then set it to the default.
// 3. If a config value is 0, but a 0 value doesn't make sense and a default value isn't defined, return an error.
func (rc *RelayPayloadRetrieverConfig) checkAndSetDefaults() error {
func (rc *PayloadRetrieverConfig) checkAndSetDefaults() error {
err := rc.PayloadClientConfig.checkAndSetDefaults()
if err != nil {
return err
}

defaultConfig := GetDefaultRelayPayloadRetrieverConfig()
if rc.RelayTimeout == 0 {
rc.RelayTimeout = defaultConfig.RelayTimeout
defaultConfig := GetDefaultPayloadRetrieverConfig()
if rc.FetchTimeout == 0 {
rc.FetchTimeout = defaultConfig.FetchTimeout
}

return nil
Expand Down
116 changes: 116 additions & 0 deletions api/clients/v2/distributed_payload_retriever.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package clients

import (
"context"
"fmt"

"github.com/Layr-Labs/eigenda/api/clients/codecs"
"github.com/Layr-Labs/eigenda/api/clients/v2/verification"
"github.com/Layr-Labs/eigenda/core"
corev2 "github.com/Layr-Labs/eigenda/core/v2"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/consensys/gnark-crypto/ecc/bn254"
)

// DistributedPayloadRetriever provides the ability to get payloads from the EigenDA nodes directly
//
// This struct is goroutine safe.
type DistributedPayloadRetriever struct {
logger logging.Logger
config PayloadRetrieverConfig
codec codecs.BlobCodec
retrievalClient RetrievalClient
g1Srs []bn254.G1Affine
}

var _ PayloadRetriever = &DistributedPayloadRetriever{}

// GetPayload iteratively attempts to retrieve a given blob from the quorums listed in the EigenDACert.
//
// If the blob is successfully retrieved, then the blob is verified. If the verification succeeds, the blob is decoded
// to yield the payload (the original user data, with no padding or any modification), and the payload is returned.
func (pr *DistributedPayloadRetriever) GetPayload(
ctx context.Context,
eigenDACert *verification.EigenDACert,
) ([]byte, error) {

blobKey, err := eigenDACert.ComputeBlobKey()
if err != nil {
return nil, fmt.Errorf("compute blob key: %w", err)
}

blobHeader := eigenDACert.BlobInclusionInfo.BlobCertificate.BlobHeader
convertedCommitment, err := verification.BlobCommitmentsBindingToInternal(&blobHeader.Commitment)
if err != nil {
return nil, fmt.Errorf("convert commitments binding to internal: %w", err)
}

// TODO (litt3): Add a feature which keeps chunks from previous quorums, and just fills in gaps
for _, quorumID := range blobHeader.QuorumNumbers {
blobBytes, err := pr.getBlobWithTimeout(
ctx,
*blobKey,
blobHeader.Version,
*convertedCommitment,
eigenDACert.BatchHeader.ReferenceBlockNumber,
quorumID)

if err != nil {
pr.logger.Warn(
"blob couldn't be retrieved from quorum",
"blobKey", blobKey.Hex(),
"quorumId", quorumID,
"error", err)
continue
}

err = verification.CheckBlobLength(blobBytes, convertedCommitment.Length)
if err != nil {
pr.logger.Warn("check blob length", "blobKey", blobKey.Hex(), "quorumID", quorumID, "error", err)
continue
}
err = verification.VerifyBlobAgainstCert(blobKey, blobBytes, convertedCommitment.Commitment, pr.g1Srs)
if err != nil {
pr.logger.Warn("verify blob against cert", "blobKey", blobKey.Hex(), "quorumID", quorumID, "error", err)
continue
}

payload, err := pr.codec.DecodeBlob(blobBytes)
if err != nil {
pr.logger.Error(
`Cert verification was successful, but decode blob failed!
This is likely a problem with the local blob codec configuration,
but could potentially indicate a maliciously generated blob certificate.
It should not be possible for an honestly generated certificate to verify
for an invalid blob!`,
"blobKey", blobKey.Hex(), "quorumID", quorumID, "eigenDACert", eigenDACert, "error", err)
return nil, fmt.Errorf("decode blob: %w", err)
}

return payload, nil
}

return nil, fmt.Errorf("unable to retrieve payload from quorums %v", blobHeader.QuorumNumbers)
}

// getBlobWithTimeout attempts to get a blob from a given quorum, and times out based on config.FetchTimeout
func (pr *DistributedPayloadRetriever) getBlobWithTimeout(
ctx context.Context,
blobKey corev2.BlobKey,
blobVersion corev2.BlobVersion,
blobCommitments encoding.BlobCommitments,
referenceBlockNumber uint32,
quorumID core.QuorumID) ([]byte, error) {

timeoutCtx, cancel := context.WithTimeout(ctx, pr.config.FetchTimeout)
defer cancel()

return pr.retrievalClient.GetBlob(
timeoutCtx,
blobKey,
blobVersion,
blobCommitments,
uint64(referenceBlockNumber),
quorumID)
}
54 changes: 44 additions & 10 deletions api/clients/v2/mock/retrieval_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 1026ea2

Please sign in to comment.