Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create payload disperser #1159

Merged
merged 25 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions api/clients/codecs/blob_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,26 @@ func GenericDecodeBlob(data []byte) ([]byte, error) {

return data, nil
}

// CreateCodec creates a new BlobCodec based on the defined polynomial form of payloads, and the desired
// BlobEncodingVersion
func CreateCodec(payloadPolynomialForm PolynomialForm, version BlobEncodingVersion) (BlobCodec, error) {
bxue-l2 marked this conversation as resolved.
Show resolved Hide resolved
lowLevelCodec, err := BlobEncodingVersionToCodec(version)
if err != nil {
return nil, fmt.Errorf("create low level codec: %w", err)
}

switch payloadPolynomialForm {
case PolynomialFormCoeff:
// Data must NOT be IFFTed during blob construction, since the payload is already in PolynomialFormCoeff after
// being encoded.
return NewNoIFFTCodec(lowLevelCodec), nil
case PolynomialFormEval:
// Data MUST be IFFTed during blob construction, since the payload is in PolynomialFormEval after being encoded,
// but must be in PolynomialFormCoeff to produce a valid blob.
return NewIFFTCodec(lowLevelCodec), nil
default:
return nil, fmt.Errorf("unsupported polynomial form: %d", payloadPolynomialForm)
}

}
36 changes: 35 additions & 1 deletion api/clients/v2/mock/cert_verifier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 43 additions & 0 deletions api/clients/v2/payload_client_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package clients

import (
"time"

"github.com/Layr-Labs/eigenda/api/clients/codecs"
"github.com/Layr-Labs/eigenda/core"
v2 "github.com/Layr-Labs/eigenda/core/v2"
)

// PayloadClientConfig contains configuration values that are needed by both PayloadRetriever and PayloadDisperser
type PayloadClientConfig struct {
// The blob encoding version to use when writing and reading blobs
BlobEncodingVersion codecs.BlobEncodingVersion

// The Ethereum RPC URL to use for querying the Ethereum blockchain.
ethenotethan marked this conversation as resolved.
Show resolved Hide resolved
EthRpcUrl string

// The address of the EigenDACertVerifier contract
EigenDACertVerifierAddr string

// PayloadPolynomialForm is the initial form of a Payload after being encoded. The configured form does not imply
// any restrictions on the contents of a payload: it merely dictates how payload data is treated after being
// encoded.
//
// Since blobs sent to the disperser must be in coefficient form, the initial form of the encoded payload dictates
// what data processing must be performed during blob construction.
//
// The chosen form also dictates how the KZG commitment made to the blob can be used. If the encoded payload starts
// in PolynomialFormEval (meaning the data WILL be IFFTed before computing the commitment) then it will be possible
// to open points on the KZG commitment to prove that the field elements correspond to the commitment. If the
// encoded payload starts in PolynomialFormCoeff (meaning the data will NOT be IFFTed before computing the
// commitment) then it will not be possible to create a commitment opening: the blob will need to be supplied in its
// entirety to perform a verification that any part of the data matches the KZG commitment.
PayloadPolynomialForm codecs.PolynomialForm

// The timeout duration for contract calls
ContractCallTimeout time.Duration
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prob should mention a default. Typically I define a defaultPayloadClientConfig struct with the default values, and assign those in the constructor for any value that wasn't set.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might be worth adding some guidelines for configuration classes to the style guide.

Copy link
Contributor Author

@litt3 litt3 Jan 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@samlaf Could you clarify what config architecture you're proposing here? Creating a constructor for the struct? Creating a method which checks and sets missing defaults?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added TODO for me to add this to style guide: #1137 (comment)

In terms of my personal preference, I like:

  • no constructor for config structs (that's very much against the point of having extendability, which requires using go's default zero value initialization)
  • having the constructor validate the config and inject default values over zero values (note: this requires care that zero values are always meaningless, which is not always the case!)
  • validation + injecting defualt values can be part of a separate function if it's a large config struct that would overwhelm the rest of constructor logic

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no constructor for config structs (that's very much against the point of having extendability, which requires using go's default zero value initialization)

Could you explain what you mean by this? What's the problem with having a constructor for ConfigA, and if ConfigA extends ConfigB, call the constructor for ConfigB in the constructor of ConfigA?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created some config utilities in 959e4564. LMK what you think

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Having a default constructor is totally fine, and is actually a good idea! What I meant is that users shouldn't be constructing the config by using a constructor of the type

func NewConfig(fieldA, fieldB, fieldC, fieldD,...) Config {
   return Config{
      fieldA,
      fieldB,
      ...
      }
}

Because this is dumb


BlobVersion v2.BlobVersion

Quorums []core.QuorumID
samlaf marked this conversation as resolved.
Show resolved Hide resolved
ethenotethan marked this conversation as resolved.
Show resolved Hide resolved
}
222 changes: 222 additions & 0 deletions api/clients/v2/payload_disperser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
package clients

import (
"context"
"fmt"
"time"

"github.com/Layr-Labs/eigenda/api/clients/codecs"
"github.com/Layr-Labs/eigenda/api/clients/v2/verification"
dispgrpc "github.com/Layr-Labs/eigenda/api/grpc/disperser/v2"
"github.com/Layr-Labs/eigenda/common/geth"
core "github.com/Layr-Labs/eigenda/core/v2"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/Layr-Labs/eigensdk-go/logging"
gethcommon "github.com/ethereum/go-ethereum/common"
)

// PayloadDisperser provides the ability to disperse payloads to EigenDA
samlaf marked this conversation as resolved.
Show resolved Hide resolved
//
// This struct is goroutine safe.
type PayloadDisperser struct {
logger logging.Logger
config *PayloadDisperserConfig
codec codecs.BlobCodec
disperserClient DisperserClient
certVerifier verification.ICertVerifier
}

// BuildPayloadDisperser builds a PayloadDisperser from config structs
func BuildPayloadDisperser(
samlaf marked this conversation as resolved.
Show resolved Hide resolved
logger logging.Logger,
payloadDisperserConfig *PayloadDisperserConfig,
samlaf marked this conversation as resolved.
Show resolved Hide resolved
disperserClientConfig *DisperserClientConfig,
// signer to sign blob dispersal requests
bxue-l2 marked this conversation as resolved.
Show resolved Hide resolved
signer core.BlobRequestSigner,
// prover is used to compute commitments to a new blob during the dispersal process
//
// IMPORTANT: it is permissible for the prover parameter to be nil, but operating with this configuration
// puts a trust assumption on the disperser. With a nil prover, the disperser is responsible for computing
// the commitments to a blob, and the PayloadDisperser doesn't have a mechanism to verify these commitments.
samlaf marked this conversation as resolved.
Show resolved Hide resolved
//
// TODO: In the future, an optimized method of commitment verification using fiat shamir transformation will
// be implemented. This feature will allow a PayloadDisperser to offload commitment generation onto the
// disperser, but the disperser's commitments will be verifiable without needing a full-fledged prover
prover encoding.Prover,
accountant *Accountant,
samlaf marked this conversation as resolved.
Show resolved Hide resolved
ethConfig geth.EthClientConfig,
ethenotethan marked this conversation as resolved.
Show resolved Hide resolved
) (*PayloadDisperser, error) {

codec, err := codecs.CreateCodec(
payloadDisperserConfig.PayloadPolynomialForm,
payloadDisperserConfig.BlobEncodingVersion)
if err != nil {
return nil, fmt.Errorf("create codec: %w", err)
}

disperserClient, err := NewDisperserClient(disperserClientConfig, signer, prover, accountant)
samlaf marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, fmt.Errorf("new disperser client: %s", err)
}

ethClient, err := geth.NewClient(ethConfig, gethcommon.Address{}, 0, logger)
ethenotethan marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, fmt.Errorf("new eth client: %w", err)
}

certVerifier, err := verification.NewCertVerifier(*ethClient, payloadDisperserConfig.EigenDACertVerifierAddr)
ethenotethan marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, fmt.Errorf("new cert verifier: %w", err)
}

return &PayloadDisperser{
logger: logger,
config: payloadDisperserConfig,
codec: codec,
disperserClient: disperserClient,
certVerifier: certVerifier,
}, nil
}

// SendPayload executes the dispersal of a payload, with these steps:
//
// 1. Encode payload into a blob
// 2. Disperse the blob
samlaf marked this conversation as resolved.
Show resolved Hide resolved
// 3. Poll the disperser with GetBlobStatus until a terminal status is reached, or until the polling timeout is reached
// 4. Construct an EigenDACert if dispersal is successful
// 5. Verify the constructed cert with a call to an ethereum contract
samlaf marked this conversation as resolved.
Show resolved Hide resolved
// 6. Return the valid cert
func (pd *PayloadDisperser) SendPayload(
ctx context.Context,
// payload is the raw data to be stored on eigenDA
payload []byte,
// salt is added while constructing the blob header
// This salt should be utilized if a blob dispersal fails, in order to retry dispersing the same payload under a
// different blob key.
samlaf marked this conversation as resolved.
Show resolved Hide resolved
salt uint32,
) (*verification.EigenDACert, error) {

blobBytes, err := pd.codec.EncodeBlob(payload)
if err != nil {
return nil, fmt.Errorf("encode payload to blob: %w", err)
}
pd.logger.Debug("Payload encoded to blob")

timeoutCtx, cancel := context.WithTimeout(ctx, pd.config.DisperseBlobTimeout)
defer cancel()
blobStatus, blobKey, err := pd.disperserClient.DisperseBlob(
timeoutCtx,
blobBytes,
pd.config.BlobVersion,
pd.config.Quorums,
salt)
if err != nil {
return nil, fmt.Errorf("disperse blob: %w", err)
}
pd.logger.Debug("Successful DisperseBlob", "blobStatus", blobStatus, "blobKey", blobKey)

timeoutCtx, cancel = context.WithTimeout(ctx, pd.config.BlobCertifiedTimeout)
defer cancel()
blobStatusReply, err := pd.pollBlobStatus(timeoutCtx, blobKey, blobStatus.ToProfobuf())
if err != nil {
return nil, fmt.Errorf("poll blob status: %w", err)
}
pd.logger.Debug("Blob status CERTIFIED", "blobKey", blobKey)

timeoutCtx, cancel = context.WithTimeout(ctx, pd.config.ContractCallTimeout)
defer cancel()
nonSignerStakesAndSignature, err := pd.certVerifier.GetNonSignerStakesAndSignature(
timeoutCtx, blobStatusReply.GetSignedBatch())
if err != nil {
return nil, fmt.Errorf("get non signer stake and signature: %w", err)
}
pd.logger.Debug("Retrieved NonSignerStakesAndSignature", "blobKey", blobKey)

eigenDACert, err := verification.BuildEigenDACert(
blobStatusReply.GetBlobInclusionInfo(),
blobStatusReply.GetSignedBatch().GetHeader(),
nonSignerStakesAndSignature)
if err != nil {
return nil, fmt.Errorf("build eigen da cert: %w", err)
samlaf marked this conversation as resolved.
Show resolved Hide resolved
}
pd.logger.Debug("Constructed EigenDACert", "blobKey", blobKey)

timeoutCtx, cancel = context.WithTimeout(ctx, pd.config.ContractCallTimeout)
defer cancel()
err = pd.certVerifier.VerifyCertV2(timeoutCtx, eigenDACert)
if err != nil {
return nil, fmt.Errorf("verify cert for blobKey %v: %w", blobKey, err)
}
pd.logger.Debug("EigenDACert verified", "blobKey", blobKey)

return eigenDACert, nil
}

// Close is responsible for calling close on all internal clients. This method will do its best to close all internal
// clients, even if some closes fail.
//
// Any and all errors returned from closing internal clients will be joined and returned.
//
// This method should only be called once.
func (pd *PayloadDisperser) Close() error {
err := pd.disperserClient.Close()
if err != nil {
return fmt.Errorf("close disperser client: %w", err)
}

return nil
}

// pollBlobStatus polls the disperser for the status of a blob that has been dispersed
//
// This method will only return a non-nil BlobStatusReply if the blob is reported to be CERTIFIED prior to the timeout.
// In all other cases, this method will return a nil BlobStatusReply, along with an error describing the failure.
func (pd *PayloadDisperser) pollBlobStatus(
samlaf marked this conversation as resolved.
Show resolved Hide resolved
ctx context.Context,
blobKey core.BlobKey,
initialStatus dispgrpc.BlobStatus,
) (*dispgrpc.BlobStatusReply, error) {

previousStatus := initialStatus

ticker := time.NewTicker(pd.config.BlobStatusPollInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return nil, fmt.Errorf(
"timed out waiting for %v blob status, final status was %v: %w",
ethenotethan marked this conversation as resolved.
Show resolved Hide resolved
dispgrpc.BlobStatus_CERTIFIED,
previousStatus,
ctx.Err())
case <-ticker.C:
// This call to the disperser doesn't have a dedicated timeout configured.
// If this call fails to return in a timely fashion, the timeout configured for the poll loop will trigger
Comment on lines +157 to +158
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this actually true?

Copy link
Contributor Author

@litt3 litt3 Jan 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a timeout set on the context being passed into pollBlobStatusUntilCertified.

If that timeout triggers, it will abort this GetBlobStatus call if necessary, and abort the polling loop, right?

blobStatusReply, err := pd.disperserClient.GetBlobStatus(ctx, blobKey)
samlaf marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
pd.logger.Warn("get blob status", "err", err, "blobKey", blobKey)
continue
}

newStatus := blobStatusReply.Status
if newStatus != previousStatus {
pd.logger.Debug(
"Blob status changed",
"blob key", blobKey,
"previous status", previousStatus,
"new status", newStatus)
previousStatus = newStatus
}

switch newStatus {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add a TODO specifying that we'll need to add more in-depth response status processing to derive failover errors

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand what you mean by "derive failover errors".

Do you mean we will need to expand the types of status returned from the top level SendPayload method, beyond what's included in the existing BlobStatus enum?

Copy link
Contributor

@ethenotethan ethenotethan Jan 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we'll need to interpret terminal status status in a meaningful way to map them into a failover error which tells the proxy to return a status 503 response which a rollup batcher can interpret as "go use other DA"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. Is it not true that any error in dispersal would require resorting to failover? If not, what class of error would you expect to not result in failover, and what would be the strategy to handle it? Retries? (trying to understand likely outcomes, to be able to write a helpful TODO)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we already have a strategy or framework in-place for this that we can extend from. Would recommend just adding a general todo vs re-articulating a methodology that was months of research

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the TODO

case dispgrpc.BlobStatus_CERTIFIED:
return blobStatusReply, nil
case dispgrpc.BlobStatus_QUEUED, dispgrpc.BlobStatus_ENCODED:
bxue-l2 marked this conversation as resolved.
Show resolved Hide resolved
continue
default:
return nil, fmt.Errorf("terminal dispersal failure for blobKey %v. blob status: %v", blobKey, newStatus)
}
}
}
}
21 changes: 21 additions & 0 deletions api/clients/v2/payload_disperser_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package clients

import "time"

// PayloadDisperserConfig contains an embedded PayloadClientConfig, plus all additional configuration values needed
// by a PayloadDisperser
type PayloadDisperserConfig struct {
PayloadClientConfig

// DisperseBlobTimeout is the duration after which the PayloadDisperser will time out, when trying to disperse a
// blob
DisperseBlobTimeout time.Duration

// BlobCertifiedTimeout is the duration after which the PayloadDisperser will time out, while polling
// the disperser for blob status, waiting for BlobStatus_CERTIFIED
BlobCertifiedTimeout time.Duration

// BlobStatusPollInterval is the tick rate for the PayloadDisperser to use, while polling the disperser with
// GetBlobStatus.
BlobStatusPollInterval time.Duration
}
Loading
Loading