diff --git a/api/clients/node_client.go b/api/clients/node_client.go
index f2284620f4..4a86d50ece 100644
--- a/api/clients/node_client.go
+++ b/api/clients/node_client.go
@@ -41,7 +41,7 @@ func (c client) GetBlobHeader(
blobIndex uint32,
) (*core.BlobHeader, *merkletree.Proof, error) {
conn, err := grpc.NewClient(
- core.OperatorSocket(socket).GetRetrievalSocket(),
+ core.OperatorSocket(socket).GetV1RetrievalSocket(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
@@ -86,7 +86,7 @@ func (c client) GetChunks(
chunksChan chan RetrievedChunks,
) {
conn, err := grpc.NewClient(
- core.OperatorSocket(opInfo.Socket).GetRetrievalSocket(),
+ core.OperatorSocket(opInfo.Socket).GetV1RetrievalSocket(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
diff --git a/api/clients/v2/payload_disperser.go b/api/clients/v2/payload_disperser.go
index 3c8923a455..68997af691 100644
--- a/api/clients/v2/payload_disperser.go
+++ b/api/clients/v2/payload_disperser.go
@@ -36,11 +36,9 @@ func BuildPayloadDisperser(log logging.Logger, payloadDispCfg PayloadDisperserCo
kzgConfig *kzg.KzgConfig, encoderCfg *encoding.Config) (*PayloadDisperser, error) {
// 1 - verify key semantics and create signer
- var signer core.BlobRequestSigner
- if len(payloadDispCfg.SignerPaymentKey) == 64 {
- signer = auth.NewLocalBlobRequestSigner(payloadDispCfg.SignerPaymentKey)
- } else {
- return nil, fmt.Errorf("invalid length for signer private key")
+ signer, err := auth.NewLocalBlobRequestSigner(payloadDispCfg.SignerPaymentKey)
+ if err != nil {
+ return nil, fmt.Errorf("new local blob request signer: %w", err)
}
// 2 - create prover
@@ -213,7 +211,7 @@ func (pd *PayloadDisperser) pollBlobStatusUntilCertified(
case <-ctx.Done():
return nil, fmt.Errorf(
"timed out waiting for %v blob status, final status was %v: %w",
- dispgrpc.BlobStatus_CERTIFIED.Descriptor(),
+ dispgrpc.BlobStatus_COMPLETE.Descriptor(),
previousStatus.Descriptor(),
ctx.Err())
case <-ticker.C:
@@ -237,7 +235,7 @@ func (pd *PayloadDisperser) pollBlobStatusUntilCertified(
// TODO: we'll need to add more in-depth response status processing to derive failover errors
switch newStatus {
- case dispgrpc.BlobStatus_CERTIFIED:
+ case dispgrpc.BlobStatus_COMPLETE:
return blobStatusReply, nil
case dispgrpc.BlobStatus_QUEUED, dispgrpc.BlobStatus_ENCODED:
continue
diff --git a/api/clients/v2/retrieval_client.go b/api/clients/v2/retrieval_client.go
index 16a1c9faf0..1cb8f9402d 100644
--- a/api/clients/v2/retrieval_client.go
+++ b/api/clients/v2/retrieval_client.go
@@ -165,7 +165,7 @@ func (r *retrievalClient) getChunksFromOperator(
chunksChan chan clients.RetrievedChunks,
) {
conn, err := grpc.NewClient(
- core.OperatorSocket(opInfo.Socket).GetRetrievalSocket(),
+ core.OperatorSocket(opInfo.Socket).GetV2RetrievalSocket(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
defer func() {
diff --git a/api/docs/disperser_v2.html b/api/docs/disperser_v2.html
index 8b88e4b9f5..0c11c08c87 100644
--- a/api/docs/disperser_v2.html
+++ b/api/docs/disperser_v2.html
@@ -428,7 +428,14 @@
BlobStatusReply
signed_batch |
SignedBatch |
|
- The signed batch. Unset if the status is not CERTIFIED. |
+ The signed batch. Only set if the blob status is GATHERING_SIGNATURES or COMPLETE.
+signed_batch and blob_inclusion_info are only set if the blob status is GATHERING_SIGNATURES or COMPLETE.
+When blob is in GATHERING_SIGNATURES status, the attestation object in signed_batch contains attestation information
+at the point in time. As it gathers more signatures, attestation object will be updated according to the latest attestation status.
+The client can use this intermediate attestation to verify a blob if it has gathered enough signatures.
+Otherwise, it should should poll the GetBlobStatus API until the desired level of attestation has been gathered or status is COMPLETE.
+When blob is in COMPLETE status, the attestation object in signed_batch contains the final attestation information.
+If the final attestation does not meet the client's requirement, the client should try a new dispersal. |
@@ -436,7 +443,7 @@ BlobStatusReply
BlobInclusionInfo |
|
BlobInclusionInfo is the information needed to verify the inclusion of a blob in a batch.
-Unset if the status is not CERTIFIED. |
+Only set if the blob status is GATHERING_SIGNATURES or COMPLETE.
@@ -812,7 +819,7 @@ SignedBatch
BlobStatus
- BlobStatus represents the status of a blob.
The status of a blob is updated as the blob is processed by the disperser.
The status of a blob can be queried by the client using the GetBlobStatus API.
Intermediate states are states that the blob can be in while being processed, and it can be updated to a different state:
- QUEUED
- ENCODED
Terminal states are states that will not be updated to a different state:
- UNKNOWN
- CERTIFIED
- FAILED
- INSUFFICIENT_SIGNATURES
+ BlobStatus represents the status of a blob.
The status of a blob is updated as the blob is processed by the disperser.
The status of a blob can be queried by the client using the GetBlobStatus API.
Intermediate states are states that the blob can be in while being processed, and it can be updated to a different state:
- QUEUED
- ENCODED
- GATHERING_SIGNATURES
Terminal states are states that will not be updated to a different state:
- UNKNOWN
- COMPLETE
- FAILED
Name | Number | Description |
@@ -844,25 +851,27 @@ BlobStatus
- CERTIFIED |
+ GATHERING_SIGNATURES |
3 |
- CERTIFIED means the blob has been dispersed and attested by the DA nodes. |
+ GATHERING_SIGNATURES means that the blob chunks are currently actively being transmitted to validators,requesting for signatures.
+Requests that timeout or receive errors are resubmitted to DA nodes for some period of time set by the disperser,
+after which the BlobStatus becomes COMPLETE.
+This status is not currently implemented, and is a placeholder for future functionality. |
- FAILED |
+ COMPLETE |
4 |
- FAILED means that the blob has failed permanently. Note that this is a terminal state, and in order to
-retry the blob, the client must submit the blob again with different salt (blob key is required to be unique). |
+ COMPLETE means the blob has been dispersed to DA nodes, and the GATHERING_SIGNATURES period of time has completed.
+This status does not guarantee any signer percentage, so a client should check that the signature has met
+its required threshold, and resubmit a new blob dispersal request if not. |
- INSUFFICIENT_SIGNATURES |
+ FAILED |
5 |
- INSUFFICIENT_SIGNATURES means that the blob has failed to gather sufficient attestation.
-
-This status is functionally equivalent to FAILED, but is used to indicate that the failure is due to an
-an inability to gather sufficient signatures. |
+ FAILED means that the blob has failed permanently. Note that this is a terminal state, and in order to
+retry the blob, the client must submit the blob again with different salt (blob key is required to be unique). |
diff --git a/api/docs/disperser_v2.md b/api/docs/disperser_v2.md
index 9bf70fff52..e889825aad 100644
--- a/api/docs/disperser_v2.md
+++ b/api/docs/disperser_v2.md
@@ -111,8 +111,8 @@ BlobStatusReply is the reply to a BlobStatusRequest.
| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| status | [BlobStatus](#disperser-v2-BlobStatus) | | The status of the blob. |
-| signed_batch | [SignedBatch](#disperser-v2-SignedBatch) | | The signed batch. Unset if the status is not CERTIFIED. |
-| blob_inclusion_info | [BlobInclusionInfo](#disperser-v2-BlobInclusionInfo) | | BlobInclusionInfo is the information needed to verify the inclusion of a blob in a batch. Unset if the status is not CERTIFIED. |
+| signed_batch | [SignedBatch](#disperser-v2-SignedBatch) | | The signed batch. Only set if the blob status is GATHERING_SIGNATURES or COMPLETE. signed_batch and blob_inclusion_info are only set if the blob status is GATHERING_SIGNATURES or COMPLETE. When blob is in GATHERING_SIGNATURES status, the attestation object in signed_batch contains attestation information at the point in time. As it gathers more signatures, attestation object will be updated according to the latest attestation status. The client can use this intermediate attestation to verify a blob if it has gathered enough signatures. Otherwise, it should should poll the GetBlobStatus API until the desired level of attestation has been gathered or status is COMPLETE. When blob is in COMPLETE status, the attestation object in signed_batch contains the final attestation information. If the final attestation does not meet the client's requirement, the client should try a new dispersal. |
+| blob_inclusion_info | [BlobInclusionInfo](#disperser-v2-BlobInclusionInfo) | | BlobInclusionInfo is the information needed to verify the inclusion of a blob in a batch. Only set if the blob status is GATHERING_SIGNATURES or COMPLETE. |
@@ -294,11 +294,11 @@ The status of a blob can be queried by the client using the GetBlobStatus API.
Intermediate states are states that the blob can be in while being processed, and it can be updated to a different state:
- QUEUED
- ENCODED
+- GATHERING_SIGNATURES
Terminal states are states that will not be updated to a different state:
- UNKNOWN
-- CERTIFIED
+- COMPLETE
- FAILED
-- INSUFFICIENT_SIGNATURES
| Name | Number | Description |
| ---- | ------ | ----------- |
@@ -307,11 +307,9 @@ Terminal states are states that will not be updated to a different state:
This status is functionally equivalent to FAILED, but is used to indicate that the failure is due to an unanticipated bug. |
| QUEUED | 1 | QUEUED means that the blob has been queued by the disperser for processing. The DisperseBlob API is asynchronous, meaning that after request validation, but before any processing, the blob is stored in a queue of some sort, and a response immediately returned to the client. |
| ENCODED | 2 | ENCODED means that the blob has been Reed-Solomon encoded into chunks and is ready to be dispersed to DA Nodes. |
-| CERTIFIED | 3 | CERTIFIED means the blob has been dispersed and attested by the DA nodes. |
-| FAILED | 4 | FAILED means that the blob has failed permanently. Note that this is a terminal state, and in order to retry the blob, the client must submit the blob again with different salt (blob key is required to be unique). |
-| INSUFFICIENT_SIGNATURES | 5 | INSUFFICIENT_SIGNATURES means that the blob has failed to gather sufficient attestation.
-
-This status is functionally equivalent to FAILED, but is used to indicate that the failure is due to an an inability to gather sufficient signatures. |
+| GATHERING_SIGNATURES | 3 | GATHERING_SIGNATURES means that the blob chunks are currently actively being transmitted to validators,requesting for signatures. Requests that timeout or receive errors are resubmitted to DA nodes for some period of time set by the disperser, after which the BlobStatus becomes COMPLETE. This status is not currently implemented, and is a placeholder for future functionality. |
+| COMPLETE | 4 | COMPLETE means the blob has been dispersed to DA nodes, and the GATHERING_SIGNATURES period of time has completed. This status does not guarantee any signer percentage, so a client should check that the signature has met its required threshold, and resubmit a new blob dispersal request if not. |
+| FAILED | 5 | FAILED means that the blob has failed permanently. Note that this is a terminal state, and in order to retry the blob, the client must submit the blob again with different salt (blob key is required to be unique). |
diff --git a/api/docs/eigenda-protos.html b/api/docs/eigenda-protos.html
index 3ac96ce284..8f2bc1742d 100644
--- a/api/docs/eigenda-protos.html
+++ b/api/docs/eigenda-protos.html
@@ -2129,7 +2129,14 @@ BlobStatusReply
signed_batch |
SignedBatch |
|
- The signed batch. Unset if the status is not CERTIFIED. |
+ The signed batch. Only set if the blob status is GATHERING_SIGNATURES or COMPLETE.
+signed_batch and blob_inclusion_info are only set if the blob status is GATHERING_SIGNATURES or COMPLETE.
+When blob is in GATHERING_SIGNATURES status, the attestation object in signed_batch contains attestation information
+at the point in time. As it gathers more signatures, attestation object will be updated according to the latest attestation status.
+The client can use this intermediate attestation to verify a blob if it has gathered enough signatures.
+Otherwise, it should should poll the GetBlobStatus API until the desired level of attestation has been gathered or status is COMPLETE.
+When blob is in COMPLETE status, the attestation object in signed_batch contains the final attestation information.
+If the final attestation does not meet the client's requirement, the client should try a new dispersal. |
@@ -2137,7 +2144,7 @@ BlobStatusReply
BlobInclusionInfo |
|
BlobInclusionInfo is the information needed to verify the inclusion of a blob in a batch.
-Unset if the status is not CERTIFIED. |
+Only set if the blob status is GATHERING_SIGNATURES or COMPLETE.
@@ -2513,7 +2520,7 @@ SignedBatch
BlobStatus
- BlobStatus represents the status of a blob.
The status of a blob is updated as the blob is processed by the disperser.
The status of a blob can be queried by the client using the GetBlobStatus API.
Intermediate states are states that the blob can be in while being processed, and it can be updated to a different state:
- QUEUED
- ENCODED
Terminal states are states that will not be updated to a different state:
- UNKNOWN
- CERTIFIED
- FAILED
- INSUFFICIENT_SIGNATURES
+ BlobStatus represents the status of a blob.
The status of a blob is updated as the blob is processed by the disperser.
The status of a blob can be queried by the client using the GetBlobStatus API.
Intermediate states are states that the blob can be in while being processed, and it can be updated to a different state:
- QUEUED
- ENCODED
- GATHERING_SIGNATURES
Terminal states are states that will not be updated to a different state:
- UNKNOWN
- COMPLETE
- FAILED
Name | Number | Description |
@@ -2545,25 +2552,27 @@ BlobStatus
- CERTIFIED |
+ GATHERING_SIGNATURES |
3 |
- CERTIFIED means the blob has been dispersed and attested by the DA nodes. |
+ GATHERING_SIGNATURES means that the blob chunks are currently actively being transmitted to validators,requesting for signatures.
+Requests that timeout or receive errors are resubmitted to DA nodes for some period of time set by the disperser,
+after which the BlobStatus becomes COMPLETE.
+This status is not currently implemented, and is a placeholder for future functionality. |
- FAILED |
+ COMPLETE |
4 |
- FAILED means that the blob has failed permanently. Note that this is a terminal state, and in order to
-retry the blob, the client must submit the blob again with different salt (blob key is required to be unique). |
+ COMPLETE means the blob has been dispersed to DA nodes, and the GATHERING_SIGNATURES period of time has completed.
+This status does not guarantee any signer percentage, so a client should check that the signature has met
+its required threshold, and resubmit a new blob dispersal request if not. |
- INSUFFICIENT_SIGNATURES |
+ FAILED |
5 |
- INSUFFICIENT_SIGNATURES means that the blob has failed to gather sufficient attestation.
-
-This status is functionally equivalent to FAILED, but is used to indicate that the failure is due to an
-an inability to gather sufficient signatures. |
+ FAILED means that the blob has failed permanently. Note that this is a terminal state, and in order to
+retry the blob, the client must submit the blob again with different salt (blob key is required to be unique). |
diff --git a/api/docs/eigenda-protos.md b/api/docs/eigenda-protos.md
index 8c1428a9a3..67db464447 100644
--- a/api/docs/eigenda-protos.md
+++ b/api/docs/eigenda-protos.md
@@ -814,8 +814,8 @@ BlobStatusReply is the reply to a BlobStatusRequest.
| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| status | [BlobStatus](#disperser-v2-BlobStatus) | | The status of the blob. |
-| signed_batch | [SignedBatch](#disperser-v2-SignedBatch) | | The signed batch. Unset if the status is not CERTIFIED. |
-| blob_inclusion_info | [BlobInclusionInfo](#disperser-v2-BlobInclusionInfo) | | BlobInclusionInfo is the information needed to verify the inclusion of a blob in a batch. Unset if the status is not CERTIFIED. |
+| signed_batch | [SignedBatch](#disperser-v2-SignedBatch) | | The signed batch. Only set if the blob status is GATHERING_SIGNATURES or COMPLETE. signed_batch and blob_inclusion_info are only set if the blob status is GATHERING_SIGNATURES or COMPLETE. When blob is in GATHERING_SIGNATURES status, the attestation object in signed_batch contains attestation information at the point in time. As it gathers more signatures, attestation object will be updated according to the latest attestation status. The client can use this intermediate attestation to verify a blob if it has gathered enough signatures. Otherwise, it should should poll the GetBlobStatus API until the desired level of attestation has been gathered or status is COMPLETE. When blob is in COMPLETE status, the attestation object in signed_batch contains the final attestation information. If the final attestation does not meet the client's requirement, the client should try a new dispersal. |
+| blob_inclusion_info | [BlobInclusionInfo](#disperser-v2-BlobInclusionInfo) | | BlobInclusionInfo is the information needed to verify the inclusion of a blob in a batch. Only set if the blob status is GATHERING_SIGNATURES or COMPLETE. |
@@ -997,11 +997,11 @@ The status of a blob can be queried by the client using the GetBlobStatus API.
Intermediate states are states that the blob can be in while being processed, and it can be updated to a different state:
- QUEUED
- ENCODED
+- GATHERING_SIGNATURES
Terminal states are states that will not be updated to a different state:
- UNKNOWN
-- CERTIFIED
+- COMPLETE
- FAILED
-- INSUFFICIENT_SIGNATURES
| Name | Number | Description |
| ---- | ------ | ----------- |
@@ -1010,11 +1010,9 @@ Terminal states are states that will not be updated to a different state:
This status is functionally equivalent to FAILED, but is used to indicate that the failure is due to an unanticipated bug. |
| QUEUED | 1 | QUEUED means that the blob has been queued by the disperser for processing. The DisperseBlob API is asynchronous, meaning that after request validation, but before any processing, the blob is stored in a queue of some sort, and a response immediately returned to the client. |
| ENCODED | 2 | ENCODED means that the blob has been Reed-Solomon encoded into chunks and is ready to be dispersed to DA Nodes. |
-| CERTIFIED | 3 | CERTIFIED means the blob has been dispersed and attested by the DA nodes. |
-| FAILED | 4 | FAILED means that the blob has failed permanently. Note that this is a terminal state, and in order to retry the blob, the client must submit the blob again with different salt (blob key is required to be unique). |
-| INSUFFICIENT_SIGNATURES | 5 | INSUFFICIENT_SIGNATURES means that the blob has failed to gather sufficient attestation.
-
-This status is functionally equivalent to FAILED, but is used to indicate that the failure is due to an an inability to gather sufficient signatures. |
+| GATHERING_SIGNATURES | 3 | GATHERING_SIGNATURES means that the blob chunks are currently actively being transmitted to validators,requesting for signatures. Requests that timeout or receive errors are resubmitted to DA nodes for some period of time set by the disperser, after which the BlobStatus becomes COMPLETE. This status is not currently implemented, and is a placeholder for future functionality. |
+| COMPLETE | 4 | COMPLETE means the blob has been dispersed to DA nodes, and the GATHERING_SIGNATURES period of time has completed. This status does not guarantee any signer percentage, so a client should check that the signature has met its required threshold, and resubmit a new blob dispersal request if not. |
+| FAILED | 5 | FAILED means that the blob has failed permanently. Note that this is a terminal state, and in order to retry the blob, the client must submit the blob again with different salt (blob key is required to be unique). |
diff --git a/api/grpc/disperser/v2/disperser_v2.pb.go b/api/grpc/disperser/v2/disperser_v2.pb.go
index 4de93e86be..70d37cf84e 100644
--- a/api/grpc/disperser/v2/disperser_v2.pb.go
+++ b/api/grpc/disperser/v2/disperser_v2.pb.go
@@ -28,11 +28,11 @@ const (
// Intermediate states are states that the blob can be in while being processed, and it can be updated to a different state:
// - QUEUED
// - ENCODED
+// - GATHERING_SIGNATURES
// Terminal states are states that will not be updated to a different state:
// - UNKNOWN
-// - CERTIFIED
+// - COMPLETE
// - FAILED
-// - INSUFFICIENT_SIGNATURES
type BlobStatus int32
const (
@@ -48,16 +48,18 @@ const (
BlobStatus_QUEUED BlobStatus = 1
// ENCODED means that the blob has been Reed-Solomon encoded into chunks and is ready to be dispersed to DA Nodes.
BlobStatus_ENCODED BlobStatus = 2
- // CERTIFIED means the blob has been dispersed and attested by the DA nodes.
- BlobStatus_CERTIFIED BlobStatus = 3
+ // GATHERING_SIGNATURES means that the blob chunks are currently actively being transmitted to validators,requesting for signatures.
+ // Requests that timeout or receive errors are resubmitted to DA nodes for some period of time set by the disperser,
+ // after which the BlobStatus becomes COMPLETE.
+ // This status is not currently implemented, and is a placeholder for future functionality.
+ BlobStatus_GATHERING_SIGNATURES BlobStatus = 3
+ // COMPLETE means the blob has been dispersed to DA nodes, and the GATHERING_SIGNATURES period of time has completed.
+ // This status does not guarantee any signer percentage, so a client should check that the signature has met
+ // its required threshold, and resubmit a new blob dispersal request if not.
+ BlobStatus_COMPLETE BlobStatus = 4
// FAILED means that the blob has failed permanently. Note that this is a terminal state, and in order to
// retry the blob, the client must submit the blob again with different salt (blob key is required to be unique).
- BlobStatus_FAILED BlobStatus = 4
- // INSUFFICIENT_SIGNATURES means that the blob has failed to gather sufficient attestation.
- //
- // This status is functionally equivalent to FAILED, but is used to indicate that the failure is due to an
- // an inability to gather sufficient signatures.
- BlobStatus_INSUFFICIENT_SIGNATURES BlobStatus = 5
+ BlobStatus_FAILED BlobStatus = 5
)
// Enum value maps for BlobStatus.
@@ -66,17 +68,17 @@ var (
0: "UNKNOWN",
1: "QUEUED",
2: "ENCODED",
- 3: "CERTIFIED",
- 4: "FAILED",
- 5: "INSUFFICIENT_SIGNATURES",
+ 3: "GATHERING_SIGNATURES",
+ 4: "COMPLETE",
+ 5: "FAILED",
}
BlobStatus_value = map[string]int32{
- "UNKNOWN": 0,
- "QUEUED": 1,
- "ENCODED": 2,
- "CERTIFIED": 3,
- "FAILED": 4,
- "INSUFFICIENT_SIGNATURES": 5,
+ "UNKNOWN": 0,
+ "QUEUED": 1,
+ "ENCODED": 2,
+ "GATHERING_SIGNATURES": 3,
+ "COMPLETE": 4,
+ "FAILED": 5,
}
)
@@ -310,10 +312,17 @@ type BlobStatusReply struct {
// The status of the blob.
Status BlobStatus `protobuf:"varint,1,opt,name=status,proto3,enum=disperser.v2.BlobStatus" json:"status,omitempty"`
- // The signed batch. Unset if the status is not CERTIFIED.
+ // The signed batch. Only set if the blob status is GATHERING_SIGNATURES or COMPLETE.
+ // signed_batch and blob_inclusion_info are only set if the blob status is GATHERING_SIGNATURES or COMPLETE.
+ // When blob is in GATHERING_SIGNATURES status, the attestation object in signed_batch contains attestation information
+ // at the point in time. As it gathers more signatures, attestation object will be updated according to the latest attestation status.
+ // The client can use this intermediate attestation to verify a blob if it has gathered enough signatures.
+ // Otherwise, it should should poll the GetBlobStatus API until the desired level of attestation has been gathered or status is COMPLETE.
+ // When blob is in COMPLETE status, the attestation object in signed_batch contains the final attestation information.
+ // If the final attestation does not meet the client's requirement, the client should try a new dispersal.
SignedBatch *SignedBatch `protobuf:"bytes,2,opt,name=signed_batch,json=signedBatch,proto3" json:"signed_batch,omitempty"`
// BlobInclusionInfo is the information needed to verify the inclusion of a blob in a batch.
- // Unset if the status is not CERTIFIED.
+ // Only set if the blob status is GATHERING_SIGNATURES or COMPLETE.
BlobInclusionInfo *BlobInclusionInfo `protobuf:"bytes,3,opt,name=blob_inclusion_info,json=blobInclusionInfo,proto3" json:"blob_inclusion_info,omitempty"`
}
@@ -1202,40 +1211,40 @@ var file_disperser_v2_disperser_v2_proto_rawDesc = []byte{
0x65, 0x63, 0x6f, 0x72, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x18, 0x01,
0x20, 0x01, 0x28, 0x0d, 0x52, 0x05, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x12, 0x14, 0x0a, 0x05, 0x75,
0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x05, 0x75, 0x73, 0x61, 0x67,
- 0x65, 0x2a, 0x6a, 0x0a, 0x0a, 0x42, 0x6c, 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12,
+ 0x65, 0x2a, 0x66, 0x0a, 0x0a, 0x42, 0x6c, 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12,
0x0b, 0x0a, 0x07, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06,
0x51, 0x55, 0x45, 0x55, 0x45, 0x44, 0x10, 0x01, 0x12, 0x0b, 0x0a, 0x07, 0x45, 0x4e, 0x43, 0x4f,
- 0x44, 0x45, 0x44, 0x10, 0x02, 0x12, 0x0d, 0x0a, 0x09, 0x43, 0x45, 0x52, 0x54, 0x49, 0x46, 0x49,
- 0x45, 0x44, 0x10, 0x03, 0x12, 0x0a, 0x0a, 0x06, 0x46, 0x41, 0x49, 0x4c, 0x45, 0x44, 0x10, 0x04,
- 0x12, 0x1b, 0x0a, 0x17, 0x49, 0x4e, 0x53, 0x55, 0x46, 0x46, 0x49, 0x43, 0x49, 0x45, 0x4e, 0x54,
- 0x5f, 0x53, 0x49, 0x47, 0x4e, 0x41, 0x54, 0x55, 0x52, 0x45, 0x53, 0x10, 0x05, 0x32, 0xf2, 0x02,
- 0x0a, 0x09, 0x44, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x12, 0x54, 0x0a, 0x0c, 0x44,
- 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x42, 0x6c, 0x6f, 0x62, 0x12, 0x21, 0x2e, 0x64, 0x69,
- 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x76, 0x32, 0x2e, 0x44, 0x69, 0x73, 0x70, 0x65,
- 0x72, 0x73, 0x65, 0x42, 0x6c, 0x6f, 0x62, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f,
- 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x76, 0x32, 0x2e, 0x44, 0x69,
- 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x42, 0x6c, 0x6f, 0x62, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22,
- 0x00, 0x12, 0x51, 0x0a, 0x0d, 0x47, 0x65, 0x74, 0x42, 0x6c, 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74,
- 0x75, 0x73, 0x12, 0x1f, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x76,
- 0x32, 0x2e, 0x42, 0x6c, 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75,
- 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e,
- 0x76, 0x32, 0x2e, 0x42, 0x6c, 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x70,
- 0x6c, 0x79, 0x22, 0x00, 0x12, 0x5d, 0x0a, 0x11, 0x47, 0x65, 0x74, 0x42, 0x6c, 0x6f, 0x62, 0x43,
- 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x6d, 0x65, 0x6e, 0x74, 0x12, 0x23, 0x2e, 0x64, 0x69, 0x73, 0x70,
- 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x76, 0x32, 0x2e, 0x42, 0x6c, 0x6f, 0x62, 0x43, 0x6f, 0x6d,
- 0x6d, 0x69, 0x74, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x21,
+ 0x44, 0x45, 0x44, 0x10, 0x02, 0x12, 0x18, 0x0a, 0x14, 0x47, 0x41, 0x54, 0x48, 0x45, 0x52, 0x49,
+ 0x4e, 0x47, 0x5f, 0x53, 0x49, 0x47, 0x4e, 0x41, 0x54, 0x55, 0x52, 0x45, 0x53, 0x10, 0x03, 0x12,
+ 0x0c, 0x0a, 0x08, 0x43, 0x4f, 0x4d, 0x50, 0x4c, 0x45, 0x54, 0x45, 0x10, 0x04, 0x12, 0x0a, 0x0a,
+ 0x06, 0x46, 0x41, 0x49, 0x4c, 0x45, 0x44, 0x10, 0x05, 0x32, 0xf2, 0x02, 0x0a, 0x09, 0x44, 0x69,
+ 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x12, 0x54, 0x0a, 0x0c, 0x44, 0x69, 0x73, 0x70, 0x65,
+ 0x72, 0x73, 0x65, 0x42, 0x6c, 0x6f, 0x62, 0x12, 0x21, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72,
+ 0x73, 0x65, 0x72, 0x2e, 0x76, 0x32, 0x2e, 0x44, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x42,
+ 0x6c, 0x6f, 0x62, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x64, 0x69, 0x73,
+ 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x76, 0x32, 0x2e, 0x44, 0x69, 0x73, 0x70, 0x65, 0x72,
+ 0x73, 0x65, 0x42, 0x6c, 0x6f, 0x62, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x51, 0x0a,
+ 0x0d, 0x47, 0x65, 0x74, 0x42, 0x6c, 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x1f,
0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x76, 0x32, 0x2e, 0x42, 0x6c,
- 0x6f, 0x62, 0x43, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x70, 0x6c,
- 0x79, 0x22, 0x00, 0x12, 0x5d, 0x0a, 0x0f, 0x47, 0x65, 0x74, 0x50, 0x61, 0x79, 0x6d, 0x65, 0x6e,
- 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x24, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73,
- 0x65, 0x72, 0x2e, 0x76, 0x32, 0x2e, 0x47, 0x65, 0x74, 0x50, 0x61, 0x79, 0x6d, 0x65, 0x6e, 0x74,
- 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x22, 0x2e, 0x64,
- 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x76, 0x32, 0x2e, 0x47, 0x65, 0x74, 0x50,
- 0x61, 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x65, 0x70, 0x6c, 0x79,
- 0x22, 0x00, 0x42, 0x34, 0x5a, 0x32, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d,
- 0x2f, 0x4c, 0x61, 0x79, 0x72, 0x2d, 0x4c, 0x61, 0x62, 0x73, 0x2f, 0x65, 0x69, 0x67, 0x65, 0x6e,
- 0x64, 0x61, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x64, 0x69, 0x73, 0x70,
- 0x65, 0x72, 0x73, 0x65, 0x72, 0x2f, 0x76, 0x32, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+ 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a,
+ 0x1d, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x76, 0x32, 0x2e, 0x42,
+ 0x6c, 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00,
+ 0x12, 0x5d, 0x0a, 0x11, 0x47, 0x65, 0x74, 0x42, 0x6c, 0x6f, 0x62, 0x43, 0x6f, 0x6d, 0x6d, 0x69,
+ 0x74, 0x6d, 0x65, 0x6e, 0x74, 0x12, 0x23, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65,
+ 0x72, 0x2e, 0x76, 0x32, 0x2e, 0x42, 0x6c, 0x6f, 0x62, 0x43, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x6d,
+ 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x21, 0x2e, 0x64, 0x69, 0x73,
+ 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x76, 0x32, 0x2e, 0x42, 0x6c, 0x6f, 0x62, 0x43, 0x6f,
+ 0x6d, 0x6d, 0x69, 0x74, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12,
+ 0x5d, 0x0a, 0x0f, 0x47, 0x65, 0x74, 0x50, 0x61, 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61,
+ 0x74, 0x65, 0x12, 0x24, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x76,
+ 0x32, 0x2e, 0x47, 0x65, 0x74, 0x50, 0x61, 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74,
+ 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x22, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65,
+ 0x72, 0x73, 0x65, 0x72, 0x2e, 0x76, 0x32, 0x2e, 0x47, 0x65, 0x74, 0x50, 0x61, 0x79, 0x6d, 0x65,
+ 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x42, 0x34,
+ 0x5a, 0x32, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x4c, 0x61, 0x79,
+ 0x72, 0x2d, 0x4c, 0x61, 0x62, 0x73, 0x2f, 0x65, 0x69, 0x67, 0x65, 0x6e, 0x64, 0x61, 0x2f, 0x61,
+ 0x70, 0x69, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65,
+ 0x72, 0x2f, 0x76, 0x32, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
diff --git a/api/proto/disperser/v2/disperser_v2.proto b/api/proto/disperser/v2/disperser_v2.proto
index e521171705..9bbd34f134 100644
--- a/api/proto/disperser/v2/disperser_v2.proto
+++ b/api/proto/disperser/v2/disperser_v2.proto
@@ -73,10 +73,17 @@ message BlobStatusRequest {
message BlobStatusReply {
// The status of the blob.
BlobStatus status = 1;
- // The signed batch. Unset if the status is not CERTIFIED.
+ // The signed batch. Only set if the blob status is GATHERING_SIGNATURES or COMPLETE.
+ // signed_batch and blob_inclusion_info are only set if the blob status is GATHERING_SIGNATURES or COMPLETE.
+ // When blob is in GATHERING_SIGNATURES status, the attestation object in signed_batch contains attestation information
+ // at the point in time. As it gathers more signatures, attestation object will be updated according to the latest attestation status.
+ // The client can use this intermediate attestation to verify a blob if it has gathered enough signatures.
+ // Otherwise, it should should poll the GetBlobStatus API until the desired level of attestation has been gathered or status is COMPLETE.
+ // When blob is in COMPLETE status, the attestation object in signed_batch contains the final attestation information.
+ // If the final attestation does not meet the client's requirement, the client should try a new dispersal.
SignedBatch signed_batch = 2;
// BlobInclusionInfo is the information needed to verify the inclusion of a blob in a batch.
- // Unset if the status is not CERTIFIED.
+ // Only set if the blob status is GATHERING_SIGNATURES or COMPLETE.
BlobInclusionInfo blob_inclusion_info = 3;
}
@@ -123,11 +130,11 @@ message GetPaymentStateReply {
// Intermediate states are states that the blob can be in while being processed, and it can be updated to a different state:
// - QUEUED
// - ENCODED
+// - GATHERING_SIGNATURES
// Terminal states are states that will not be updated to a different state:
// - UNKNOWN
-// - CERTIFIED
+// - COMPLETE
// - FAILED
-// - INSUFFICIENT_SIGNATURES
enum BlobStatus {
// UNKNOWN means that the status of the blob is unknown.
// This is a catch all and should not be encountered absent a bug.
@@ -144,18 +151,20 @@ enum BlobStatus {
// ENCODED means that the blob has been Reed-Solomon encoded into chunks and is ready to be dispersed to DA Nodes.
ENCODED = 2;
- // CERTIFIED means the blob has been dispersed and attested by the DA nodes.
- CERTIFIED = 3;
+ // GATHERING_SIGNATURES means that the blob chunks are currently actively being transmitted to validators,requesting for signatures.
+ // Requests that timeout or receive errors are resubmitted to DA nodes for some period of time set by the disperser,
+ // after which the BlobStatus becomes COMPLETE.
+ // This status is not currently implemented, and is a placeholder for future functionality.
+ GATHERING_SIGNATURES = 3;
+
+ // COMPLETE means the blob has been dispersed to DA nodes, and the GATHERING_SIGNATURES period of time has completed.
+ // This status does not guarantee any signer percentage, so a client should check that the signature has met
+ // its required threshold, and resubmit a new blob dispersal request if not.
+ COMPLETE = 4;
// FAILED means that the blob has failed permanently. Note that this is a terminal state, and in order to
// retry the blob, the client must submit the blob again with different salt (blob key is required to be unique).
- FAILED = 4;
-
- // INSUFFICIENT_SIGNATURES means that the blob has failed to gather sufficient attestation.
- //
- // This status is functionally equivalent to FAILED, but is used to indicate that the failure is due to an
- // an inability to gather sufficient signatures.
- INSUFFICIENT_SIGNATURES = 5;
+ FAILED = 5;
}
// SignedBatch is a batch of blobs with a signature.
diff --git a/core/auth/v2/auth_test.go b/core/auth/v2/auth_test.go
index 3d22b66c3f..da8dedeea6 100644
--- a/core/auth/v2/auth_test.go
+++ b/core/auth/v2/auth_test.go
@@ -19,7 +19,8 @@ var (
)
func TestAuthentication(t *testing.T) {
- signer := auth.NewLocalBlobRequestSigner(privateKeyHex)
+ signer, err := auth.NewLocalBlobRequestSigner(privateKeyHex)
+ assert.NoError(t, err)
authenticator := auth.NewAuthenticator()
accountId, err := signer.GetAccountID()
@@ -36,7 +37,8 @@ func TestAuthentication(t *testing.T) {
}
func TestAuthenticationFail(t *testing.T) {
- signer := auth.NewLocalBlobRequestSigner(privateKeyHex)
+ signer, err := auth.NewLocalBlobRequestSigner(privateKeyHex)
+ assert.NoError(t, err)
authenticator := auth.NewAuthenticator()
accountId, err := signer.GetAccountID()
@@ -45,7 +47,8 @@ func TestAuthenticationFail(t *testing.T) {
header := testHeader(t, accountId)
wrongPrivateKeyHex := "0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcded"
- signer = auth.NewLocalBlobRequestSigner(wrongPrivateKeyHex)
+ signer, err = auth.NewLocalBlobRequestSigner(wrongPrivateKeyHex)
+ assert.NoError(t, err)
// Sign the header
signature, err := signer.SignBlobRequest(header)
@@ -113,7 +116,8 @@ func testHeader(t *testing.T, accountID string) *corev2.BlobHeader {
}
func TestAuthenticatePaymentStateRequestValid(t *testing.T) {
- signer := auth.NewLocalBlobRequestSigner(privateKeyHex)
+ signer, err := auth.NewLocalBlobRequestSigner(privateKeyHex)
+ assert.NoError(t, err)
authenticator := auth.NewAuthenticator()
signature, err := signer.SignPaymentStateRequest()
@@ -143,11 +147,13 @@ func TestAuthenticatePaymentStateRequestInvalidPublicKey(t *testing.T) {
}
func TestAuthenticatePaymentStateRequestSignatureMismatch(t *testing.T) {
- signer := auth.NewLocalBlobRequestSigner(privateKeyHex)
+ signer, err := auth.NewLocalBlobRequestSigner(privateKeyHex)
+ assert.NoError(t, err)
authenticator := auth.NewAuthenticator()
// Create a different signer with wrong private key
- wrongSigner := auth.NewLocalBlobRequestSigner("0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcded")
+ wrongSigner, err := auth.NewLocalBlobRequestSigner("0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcded")
+ assert.NoError(t, err)
// Sign with wrong key
accountId, err := signer.GetAccountID()
@@ -162,7 +168,8 @@ func TestAuthenticatePaymentStateRequestSignatureMismatch(t *testing.T) {
}
func TestAuthenticatePaymentStateRequestCorruptedSignature(t *testing.T) {
- signer := auth.NewLocalBlobRequestSigner(privateKeyHex)
+ signer, err := auth.NewLocalBlobRequestSigner(privateKeyHex)
+ assert.NoError(t, err)
authenticator := auth.NewAuthenticator()
accountId, err := signer.GetAccountID()
diff --git a/core/auth/v2/signer.go b/core/auth/v2/signer.go
index bbfef3eca9..6314d5c7a2 100644
--- a/core/auth/v2/signer.go
+++ b/core/auth/v2/signer.go
@@ -4,7 +4,6 @@ import (
"crypto/ecdsa"
"crypto/sha256"
"fmt"
- "log"
core "github.com/Layr-Labs/eigenda/core/v2"
"github.com/ethereum/go-ethereum/common"
@@ -17,16 +16,16 @@ type LocalBlobRequestSigner struct {
var _ core.BlobRequestSigner = &LocalBlobRequestSigner{}
-func NewLocalBlobRequestSigner(privateKeyHex string) *LocalBlobRequestSigner {
+func NewLocalBlobRequestSigner(privateKeyHex string) (*LocalBlobRequestSigner, error) {
privateKeyBytes := common.FromHex(privateKeyHex)
privateKey, err := crypto.ToECDSA(privateKeyBytes)
if err != nil {
- log.Fatalf("Failed to parse private key: %v", err)
+ return nil, fmt.Errorf("create ECDSA private key: %w", err)
}
return &LocalBlobRequestSigner{
PrivateKey: privateKey,
- }
+ }, nil
}
func (s *LocalBlobRequestSigner) SignBlobRequest(header *core.BlobHeader) ([]byte, error) {
diff --git a/core/auth/v2/signer_test.go b/core/auth/v2/signer_test.go
index b17022bcd3..0fa0f99a8e 100644
--- a/core/auth/v2/signer_test.go
+++ b/core/auth/v2/signer_test.go
@@ -20,7 +20,8 @@ func TestGetAccountID(t *testing.T) {
expectedAccountID := "0x1aa8226f6d354380dDE75eE6B634875c4203e522"
// Create signer instance
- signer := NewLocalBlobRequestSigner(privateKey)
+ signer, err := NewLocalBlobRequestSigner(privateKey)
+ require.NoError(t, err)
// Get account ID
accountID, err := signer.GetAccountID()
@@ -30,7 +31,8 @@ func TestGetAccountID(t *testing.T) {
func TestSignBlobRequest(t *testing.T) {
privateKey := "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcded"
- signer := NewLocalBlobRequestSigner(privateKey)
+ signer, err := NewLocalBlobRequestSigner(privateKey)
+ require.NoError(t, err)
accountID, err := signer.GetAccountID()
require.NoError(t, err)
require.Equal(t, "0x1aa8226f6d354380dDE75eE6B634875c4203e522", accountID)
@@ -99,7 +101,8 @@ func TestSignBlobRequest(t *testing.T) {
func TestSignPaymentStateRequest(t *testing.T) {
privateKey := "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcded"
- signer := NewLocalBlobRequestSigner(privateKey)
+ signer, err := NewLocalBlobRequestSigner(privateKey)
+ require.NoError(t, err)
expectedAddr := "0x1aa8226f6d354380dDE75eE6B634875c4203e522"
accountID, err := signer.GetAccountID()
require.NoError(t, err)
diff --git a/core/mock/state.go b/core/mock/state.go
index 136f193e85..ffdd14fdaa 100644
--- a/core/mock/state.go
+++ b/core/mock/state.go
@@ -32,6 +32,7 @@ type PrivateOperatorInfo struct {
DispersalPort string
RetrievalPort string
V2DispersalPort string
+ V2RetrievalPort string
}
type PrivateOperatorState struct {
@@ -140,7 +141,8 @@ func (d *ChainDataMock) GetTotalOperatorStateWithQuorums(ctx context.Context, bl
dispersalPort := fmt.Sprintf("3%03v", 2*i)
retrievalPort := fmt.Sprintf("3%03v", 2*i+1)
v2DispersalPort := fmt.Sprintf("3%03v", 2*i+2)
- socket := core.MakeOperatorSocket(host, dispersalPort, retrievalPort, v2DispersalPort)
+ v2RetrievalPort := fmt.Sprintf("3%03v", 2*i+3)
+ socket := core.MakeOperatorSocket(host, dispersalPort, retrievalPort, v2DispersalPort, v2RetrievalPort)
indexed := &core.IndexedOperatorInfo{
Socket: string(socket),
@@ -161,6 +163,7 @@ func (d *ChainDataMock) GetTotalOperatorStateWithQuorums(ctx context.Context, bl
DispersalPort: dispersalPort,
RetrievalPort: retrievalPort,
V2DispersalPort: v2DispersalPort,
+ V2RetrievalPort: v2RetrievalPort,
}
indexedOperators[id] = indexed
diff --git a/core/serialization.go b/core/serialization.go
index d69d03c462..5c06d682f2 100644
--- a/core/serialization.go
+++ b/core/serialization.go
@@ -528,7 +528,7 @@ func decode(data []byte, obj any) error {
}
func (s OperatorSocket) GetV1DispersalSocket() string {
- ip, v1DispersalPort, _, _, err := ParseOperatorSocket(string(s))
+ ip, v1DispersalPort, _, _, _, err := ParseOperatorSocket(string(s))
if err != nil {
return ""
}
@@ -536,17 +536,25 @@ func (s OperatorSocket) GetV1DispersalSocket() string {
}
func (s OperatorSocket) GetV2DispersalSocket() string {
- ip, _, _, v2DispersalPort, err := ParseOperatorSocket(string(s))
+ ip, _, _, v2DispersalPort, _, err := ParseOperatorSocket(string(s))
if err != nil || v2DispersalPort == "" {
return ""
}
return fmt.Sprintf("%s:%s", ip, v2DispersalPort)
}
-func (s OperatorSocket) GetRetrievalSocket() string {
- ip, _, retrievalPort, _, err := ParseOperatorSocket(string(s))
+func (s OperatorSocket) GetV1RetrievalSocket() string {
+ ip, _, v1retrievalPort, _, _, err := ParseOperatorSocket(string(s))
if err != nil {
return ""
}
- return fmt.Sprintf("%s:%s", ip, retrievalPort)
+ return fmt.Sprintf("%s:%s", ip, v1retrievalPort)
+}
+
+func (s OperatorSocket) GetV2RetrievalSocket() string {
+ ip, _, _, _, v2RetrievalPort, err := ParseOperatorSocket(string(s))
+ if err != nil || v2RetrievalPort == "" {
+ return ""
+ }
+ return fmt.Sprintf("%s:%s", ip, v2RetrievalPort)
}
diff --git a/core/serialization_test.go b/core/serialization_test.go
index 727933d150..45592b37aa 100644
--- a/core/serialization_test.go
+++ b/core/serialization_test.go
@@ -195,36 +195,37 @@ func TestHashPubKeyG1(t *testing.T) {
}
func TestParseOperatorSocket(t *testing.T) {
- operatorSocket := "localhost:1234;5678;9999"
- host, dispersalPort, retrievalPort, v2DispersalPort, err := core.ParseOperatorSocket(operatorSocket)
+ operatorSocket := "localhost:1234;5678;9999;10001"
+ host, v1DispersalPort, v1RetrievalPort, v2DispersalPort, v2RetrievalPort, err := core.ParseOperatorSocket(operatorSocket)
assert.NoError(t, err)
assert.Equal(t, "localhost", host)
- assert.Equal(t, "1234", dispersalPort)
- assert.Equal(t, "5678", retrievalPort)
+ assert.Equal(t, "1234", v1DispersalPort)
+ assert.Equal(t, "5678", v1RetrievalPort)
assert.Equal(t, "9999", v2DispersalPort)
+ assert.Equal(t, "10001", v2RetrievalPort)
- host, dispersalPort, retrievalPort, v2DispersalPort, err = core.ParseOperatorSocket("localhost:1234;5678")
+ host, v1DispersalPort, v1RetrievalPort, v2DispersalPort, _, err = core.ParseOperatorSocket("localhost:1234;5678")
assert.NoError(t, err)
assert.Equal(t, "localhost", host)
- assert.Equal(t, "1234", dispersalPort)
- assert.Equal(t, "5678", retrievalPort)
+ assert.Equal(t, "1234", v1DispersalPort)
+ assert.Equal(t, "5678", v1RetrievalPort)
assert.Equal(t, "", v2DispersalPort)
- _, _, _, _, err = core.ParseOperatorSocket("localhost;1234;5678")
+ _, _, _, _, _, err = core.ParseOperatorSocket("localhost;1234;5678")
assert.NotNil(t, err)
- assert.ErrorContains(t, err, "invalid socket address format")
+ assert.ErrorContains(t, err, "invalid host address format")
- _, _, _, _, err = core.ParseOperatorSocket("localhost:12345678")
+ _, _, _, _, _, err = core.ParseOperatorSocket("localhost:12345678")
assert.NotNil(t, err)
- assert.ErrorContains(t, err, "invalid socket address format")
+ assert.ErrorContains(t, err, "invalid v1 dispersal port format")
- _, _, _, _, err = core.ParseOperatorSocket("localhost1234;5678")
+ _, _, _, _, _, err = core.ParseOperatorSocket("localhost1234;5678")
assert.NotNil(t, err)
- assert.ErrorContains(t, err, "invalid socket address format")
+ assert.ErrorContains(t, err, "invalid host address format")
}
func TestGetV1DispersalSocket(t *testing.T) {
- operatorSocket := core.OperatorSocket("localhost:1234;5678;9999")
+ operatorSocket := core.OperatorSocket("localhost:1234;5678;9999;1025")
socket := operatorSocket.GetV1DispersalSocket()
assert.Equal(t, "localhost:1234", socket)
@@ -234,28 +235,84 @@ func TestGetV1DispersalSocket(t *testing.T) {
operatorSocket = core.OperatorSocket("localhost:1234;5678;")
socket = operatorSocket.GetV1DispersalSocket()
- assert.Equal(t, "localhost:1234", socket)
+ assert.Equal(t, "", socket)
operatorSocket = core.OperatorSocket("localhost:1234")
socket = operatorSocket.GetV1DispersalSocket()
assert.Equal(t, "", socket)
}
-func TestGetRetrievalSocket(t *testing.T) {
- operatorSocket := core.OperatorSocket("localhost:1234;5678;9999")
- socket := operatorSocket.GetRetrievalSocket()
+func TestGetV1RetrievalSocket(t *testing.T) {
+ // Valid v1/v2 socket
+ operatorSocket := core.OperatorSocket("localhost:1234;5678;9999;10001")
+ socket := operatorSocket.GetV1RetrievalSocket()
assert.Equal(t, "localhost:5678", socket)
+ // Valid v1 socket
operatorSocket = core.OperatorSocket("localhost:1234;5678")
- socket = operatorSocket.GetRetrievalSocket()
+ socket = operatorSocket.GetV1RetrievalSocket()
assert.Equal(t, "localhost:5678", socket)
+ // Invalid socket testcases
+ operatorSocket = core.OperatorSocket("localhost:1234;5678;9999;10001;")
+ socket = operatorSocket.GetV1RetrievalSocket()
+ assert.Equal(t, "", socket)
+
operatorSocket = core.OperatorSocket("localhost:1234;5678;")
- socket = operatorSocket.GetRetrievalSocket()
- assert.Equal(t, "localhost:5678", socket)
+ socket = operatorSocket.GetV1RetrievalSocket()
+ assert.Equal(t, "", socket)
+
+ operatorSocket = core.OperatorSocket("localhost:;1234;5678;")
+ socket = operatorSocket.GetV1RetrievalSocket()
+ assert.Equal(t, "", socket)
+
+ operatorSocket = core.OperatorSocket("localhost:1234;:;5678;")
+ socket = operatorSocket.GetV1RetrievalSocket()
+ assert.Equal(t, "", socket)
+
+ operatorSocket = core.OperatorSocket("localhost:;;;")
+ socket = operatorSocket.GetV1RetrievalSocket()
+ assert.Equal(t, "", socket)
+
+ operatorSocket = core.OperatorSocket("localhost:1234")
+ socket = operatorSocket.GetV1RetrievalSocket()
+ assert.Equal(t, "", socket)
+}
+
+func TestGetV2RetrievalSocket(t *testing.T) {
+ // Valid v1/v2 socket
+ operatorSocket := core.OperatorSocket("localhost:1234;5678;9999;10001")
+ socket := operatorSocket.GetV2RetrievalSocket()
+ assert.Equal(t, "localhost:10001", socket)
+
+ // Invalid v2 socket
+ operatorSocket = core.OperatorSocket("localhost:1234;5678")
+ socket = operatorSocket.GetV2RetrievalSocket()
+ assert.Equal(t, "", socket)
+
+ // Invalid socket testcases
+ operatorSocket = core.OperatorSocket("localhost:1234;5678;9999;10001;")
+ socket = operatorSocket.GetV2RetrievalSocket()
+ assert.Equal(t, "", socket)
+
+ operatorSocket = core.OperatorSocket("localhost:1234;5678;")
+ socket = operatorSocket.GetV2RetrievalSocket()
+ assert.Equal(t, "", socket)
+
+ operatorSocket = core.OperatorSocket("localhost:;1234;5678;")
+ socket = operatorSocket.GetV2RetrievalSocket()
+ assert.Equal(t, "", socket)
+
+ operatorSocket = core.OperatorSocket("localhost:1234;:;5678;")
+ socket = operatorSocket.GetV2RetrievalSocket()
+ assert.Equal(t, "", socket)
+
+ operatorSocket = core.OperatorSocket("localhost:;;;")
+ socket = operatorSocket.GetV2RetrievalSocket()
+ assert.Equal(t, "", socket)
operatorSocket = core.OperatorSocket("localhost:1234")
- socket = operatorSocket.GetRetrievalSocket()
+ socket = operatorSocket.GetV2RetrievalSocket()
assert.Equal(t, "", socket)
}
diff --git a/core/state.go b/core/state.go
index 6993fca175..1e34c11c77 100644
--- a/core/state.go
+++ b/core/state.go
@@ -6,6 +6,7 @@ import (
"encoding/json"
"fmt"
"math/big"
+ "net"
"slices"
"strings"
)
@@ -19,48 +20,73 @@ func (s OperatorSocket) String() string {
return string(s)
}
-func MakeOperatorSocket(nodeIP, dispersalPort, retrievalPort, v2DispersalPort string) OperatorSocket {
- if v2DispersalPort == "" {
+func MakeOperatorSocket(nodeIP, dispersalPort, retrievalPort, v2DispersalPort, v2RetrievalPort string) OperatorSocket {
+ //TODO: Add config checks for invalid v1/v2 configs -- for v1, both v2 ports must be empty and for v2, both ports must be valid, reject any other combinations.
+ if v2DispersalPort == "" && v2RetrievalPort == "" {
return OperatorSocket(fmt.Sprintf("%s:%s;%s", nodeIP, dispersalPort, retrievalPort))
}
- return OperatorSocket(fmt.Sprintf("%s:%s;%s;%s", nodeIP, dispersalPort, retrievalPort, v2DispersalPort))
+ return OperatorSocket(fmt.Sprintf("%s:%s;%s;%s;%s", nodeIP, dispersalPort, retrievalPort, v2DispersalPort, v2RetrievalPort))
}
type StakeAmount = *big.Int
-func ParseOperatorSocket(socket string) (host string, dispersalPort string, retrievalPort string, v2DispersalPort string, err error) {
+func ParseOperatorSocket(socket string) (host, v1DispersalPort, v1RetrievalPort, v2DispersalPort, v2RetrievalPort string, err error) {
+
s := strings.Split(socket, ";")
- if len(s) == 2 {
- // no v2 dispersal port
- retrievalPort = s[1]
- s = strings.Split(s[0], ":")
- if len(s) != 2 {
- err = fmt.Errorf("invalid socket address format: %s", socket)
- return
- }
- host = s[0]
- dispersalPort = s[1]
+ host, v1DispersalPort, err = net.SplitHostPort(s[0])
+ if err != nil {
+ err = fmt.Errorf("invalid host address format in %s: it must specify valid IP or host name (ex. 0.0.0.0:32004;32005;32006;32007)", socket)
return
}
+ if _, err = net.LookupHost(host); err != nil {
+ //Invalid host
+ host, v1DispersalPort, v1RetrievalPort, v2DispersalPort, v2RetrievalPort, err =
+ "", "", "", "", "",
+ fmt.Errorf("invalid host address format in %s: it must specify valid IP or host name (ex. 0.0.0.0:32004;32005;32006;32007)", socket)
+ return
+ }
+ if err = ValidatePort(v1DispersalPort); err != nil {
+ host, v1DispersalPort, v1RetrievalPort, v2DispersalPort, v2RetrievalPort, err =
+ "", "", "", "", "",
+ fmt.Errorf("invalid v1 dispersal port format in %s: it must specify valid v1 dispersal port (ex. 0.0.0.0:32004;32005;32006;32007)", socket)
+ return
+ }
- if len(s) == 3 {
- // all ports specified
+ switch len(s) {
+ case 4:
v2DispersalPort = s[2]
- retrievalPort = s[1]
+ if err = ValidatePort(v2DispersalPort); err != nil {
+ host, v1DispersalPort, v1RetrievalPort, v2DispersalPort, v2RetrievalPort, err =
+ "", "", "", "", "",
+ fmt.Errorf("invalid v2 dispersal port format in %s: it must specify valid v2 dispersal port (ex. 0.0.0.0:32004;32005;32006;32007)", socket)
+ return
+ }
- s = strings.Split(s[0], ":")
- if len(s) != 2 {
- err = fmt.Errorf("invalid socket address format: %s", socket)
+ v2RetrievalPort = s[3]
+ if err = ValidatePort(v2RetrievalPort); err != nil {
+ host, v1DispersalPort, v1RetrievalPort, v2DispersalPort, v2RetrievalPort, err =
+ "", "", "", "", "",
+ fmt.Errorf("invalid v2 retrieval port format in %s: it must specify valid v2 retrieval port (ex. 0.0.0.0:32004;32005;32006;32007)", socket)
return
}
- host = s[0]
- dispersalPort = s[1]
+ fallthrough
+ case 2:
+ // V1 Parsing
+ v1RetrievalPort = s[1]
+ if err = ValidatePort(v1RetrievalPort); err != nil {
+ host, v1DispersalPort, v1RetrievalPort, v2DispersalPort, v2RetrievalPort, err =
+ "", "", "", "", "",
+ fmt.Errorf("invalid v1 retrieval port format in %s: it must specify valid v1 retrieval port (ex. 0.0.0.0:32004;32005;32006;32007)", socket)
+ }
+ return
+ default:
+ host, v1DispersalPort, v1RetrievalPort, v2DispersalPort, v2RetrievalPort, err =
+ "", "", "", "", "",
+ fmt.Errorf("invalid socket address format %s: it must specify v1 dispersal/retrieval ports, or v2 dispersal/retrieval ports (ex. 0.0.0.0:32004;32005;32006;32007)", socket)
return
}
-
- return "", "", "", "", fmt.Errorf("invalid socket address format %s: it must specify dispersal port, retrieval port, and/or v2 dispersal port (ex. 0.0.0.0:32004;32005;32006)", socket)
}
// OperatorInfo contains information about an operator which is stored on the blockchain state,
diff --git a/core/utils.go b/core/utils.go
index 479e61a872..f2254067a2 100644
--- a/core/utils.go
+++ b/core/utils.go
@@ -1,8 +1,10 @@
package core
import (
+ "fmt"
"math"
"math/big"
+ "strconv"
"golang.org/x/exp/constraints"
)
@@ -23,3 +25,15 @@ func NextPowerOf2[T constraints.Integer](d T) T {
nextPower := math.Ceil(math.Log2(float64(d)))
return T(math.Pow(2.0, nextPower))
}
+
+func ValidatePort(portStr string) error {
+ port, err := strconv.Atoi(portStr)
+ if err != nil {
+ return fmt.Errorf("port is not a valid number: %v", err)
+ }
+
+ if port < 1 || port > 65535 {
+ return fmt.Errorf("port number out of valid range (1-65535)")
+ }
+ return err
+}
diff --git a/disperser/apiserver/get_blob_status_v2.go b/disperser/apiserver/get_blob_status_v2.go
index e5f540ba7d..145c28bb98 100644
--- a/disperser/apiserver/get_blob_status_v2.go
+++ b/disperser/apiserver/get_blob_status_v2.go
@@ -34,7 +34,8 @@ func (s *DispersalServerV2) GetBlobStatus(ctx context.Context, req *pb.BlobStatu
return nil, api.NewErrorInternal(fmt.Sprintf("failed to get blob metadata: %s", err.Error()))
}
- if metadata.BlobStatus != dispv2.Certified {
+ // If the blob is not complete or gathering signatures, return the status without the signed batch
+ if metadata.BlobStatus != dispv2.Complete && metadata.BlobStatus != dispv2.GatheringSignatures {
return &pb.BlobStatusReply{
Status: metadata.BlobStatus.ToProfobuf(),
}, nil
@@ -42,27 +43,27 @@ func (s *DispersalServerV2) GetBlobStatus(ctx context.Context, req *pb.BlobStatu
cert, _, err := s.blobMetadataStore.GetBlobCertificate(ctx, blobKey)
if err != nil {
- s.logger.Error("failed to get blob certificate for certified blob", "err", err, "blobKey", blobKey.Hex())
+ s.logger.Error("failed to get blob certificate for blob in GatheringSignatures/Complete status", "err", err, "blobKey", blobKey.Hex())
if errors.Is(err, dispcommon.ErrMetadataNotFound) {
return nil, api.NewErrorNotFound("no such blob certificate found")
}
return nil, api.NewErrorInternal(fmt.Sprintf("failed to get blob certificate: %s", err.Error()))
}
- // For certified blobs, include signed batch and blob inclusion info
+ // For blobs in GatheringSignatures/Complete status, include signed batch and blob inclusion info
blobInclusionInfos, err := s.blobMetadataStore.GetBlobInclusionInfos(ctx, blobKey)
if err != nil {
- s.logger.Error("failed to get blob inclusion info for certified blob", "err", err, "blobKey", blobKey.Hex())
+ s.logger.Error("failed to get blob inclusion info for blob", "err", err, "blobKey", blobKey.Hex())
return nil, api.NewErrorInternal(fmt.Sprintf("failed to get blob inclusion info: %s", err.Error()))
}
if len(blobInclusionInfos) == 0 {
- s.logger.Error("no blob inclusion info found for certified blob", "blobKey", blobKey.Hex())
+ s.logger.Error("no blob inclusion info found for blob", "blobKey", blobKey.Hex())
return nil, api.NewErrorInternal("no blob inclusion info found")
}
if len(blobInclusionInfos) > 1 {
- s.logger.Warn("multiple inclusion info found for certified blob", "blobKey", blobKey.Hex())
+ s.logger.Warn("multiple inclusion info found for blob", "blobKey", blobKey.Hex())
}
for _, inclusionInfo := range blobInclusionInfos {
@@ -101,6 +102,6 @@ func (s *DispersalServerV2) GetBlobStatus(ctx context.Context, req *pb.BlobStatu
}, nil
}
- s.logger.Error("no signed batch found for certified blob", "blobKey", blobKey.Hex())
+ s.logger.Error("no signed batch found for blob", "blobKey", blobKey.Hex())
return nil, api.NewErrorInternal("no signed batch found")
}
diff --git a/disperser/apiserver/server_v2_test.go b/disperser/apiserver/server_v2_test.go
index 8503e55cf8..fbe13c094a 100644
--- a/disperser/apiserver/server_v2_test.go
+++ b/disperser/apiserver/server_v2_test.go
@@ -72,7 +72,8 @@ func TestV2DisperseBlob(t *testing.T) {
}
blobHeader, err := corev2.BlobHeaderFromProtobuf(blobHeaderProto)
assert.NoError(t, err)
- signer := auth.NewLocalBlobRequestSigner(privateKeyHex)
+ signer, err := auth.NewLocalBlobRequestSigner(privateKeyHex)
+ assert.NoError(t, err)
sig, err := signer.SignBlobRequest(blobHeader)
assert.NoError(t, err)
@@ -121,7 +122,8 @@ func TestV2DisperseBlobRequestValidation(t *testing.T) {
data := make([]byte, 50)
_, err := rand.Read(data)
assert.NoError(t, err)
- signer := auth.NewLocalBlobRequestSigner(privateKeyHex)
+ signer, err := auth.NewLocalBlobRequestSigner(privateKeyHex)
+ assert.NoError(t, err)
data = codec.ConvertByPaddingEmptyByte(data)
commitments, err := prover.GetCommitmentsForPaddedLength(data)
assert.NoError(t, err)
@@ -329,7 +331,7 @@ func TestV2GetBlobStatus(t *testing.T) {
err = c.BlobMetadataStore.PutBlobCertificate(ctx, blobCert, nil)
require.NoError(t, err)
- // Non ceritified blob status
+ // Queued/Encoded blob status
status, err := c.DispersalServerV2.GetBlobStatus(ctx, &pbv2.BlobStatusRequest{
BlobKey: blobKey[:],
})
@@ -343,8 +345,8 @@ func TestV2GetBlobStatus(t *testing.T) {
require.NoError(t, err)
require.Equal(t, pbv2.BlobStatus_ENCODED, status.Status)
- // Certified blob status
- err = c.BlobMetadataStore.UpdateBlobStatus(ctx, blobKey, dispv2.Certified)
+ // Complete blob status
+ err = c.BlobMetadataStore.UpdateBlobStatus(ctx, blobKey, dispv2.Complete)
require.NoError(t, err)
batchHeader := &corev2.BatchHeader{
BatchRoot: [32]byte{1, 2, 3},
@@ -384,7 +386,7 @@ func TestV2GetBlobStatus(t *testing.T) {
BlobKey: blobKey[:],
})
require.NoError(t, err)
- require.Equal(t, pbv2.BlobStatus_CERTIFIED, reply.GetStatus())
+ require.Equal(t, pbv2.BlobStatus_COMPLETE, reply.GetStatus())
blobHeaderProto, err := blobHeader.ToProtobuf()
require.NoError(t, err)
blobCertProto, err := blobCert.ToProtobuf()
@@ -530,7 +532,8 @@ func newTestServerV2(t *testing.T) *testComponents {
err = s.RefreshOnchainState(context.Background())
assert.NoError(t, err)
- signer := auth.NewLocalBlobRequestSigner(privateKeyHex)
+ signer, err := auth.NewLocalBlobRequestSigner(privateKeyHex)
+ assert.NoError(t, err)
p := &peer.Peer{
Addr: &net.TCPAddr{
IP: net.ParseIP("0.0.0.0"),
@@ -581,7 +584,8 @@ func TestInvalidLength(t *testing.T) {
}
blobHeader, err := corev2.BlobHeaderFromProtobuf(blobHeaderProto)
assert.NoError(t, err)
- signer := auth.NewLocalBlobRequestSigner(privateKeyHex)
+ signer, err := auth.NewLocalBlobRequestSigner(privateKeyHex)
+ assert.NoError(t, err)
sig, err := signer.SignBlobRequest(blobHeader)
assert.NoError(t, err)
diff --git a/disperser/common/semver/semver.go b/disperser/common/semver/semver.go
index 2e7c80b3e1..b3720361b1 100644
--- a/disperser/common/semver/semver.go
+++ b/disperser/common/semver/semver.go
@@ -31,7 +31,7 @@ func ScanOperators(operators map[core.OperatorID]*core.IndexedOperatorInfo, oper
operatorSocket := core.OperatorSocket(operators[operatorId].Socket)
var socket string
if useRetrievalSocket {
- socket = operatorSocket.GetRetrievalSocket()
+ socket = operatorSocket.GetV1RetrievalSocket()
} else {
socket = operatorSocket.GetV1DispersalSocket()
}
diff --git a/disperser/common/v2/blob.go b/disperser/common/v2/blob.go
index 5882ed84db..95716ced25 100644
--- a/disperser/common/v2/blob.go
+++ b/disperser/common/v2/blob.go
@@ -13,9 +13,9 @@ type BlobStatus uint
const (
Queued BlobStatus = iota
Encoded
- Certified
+ GatheringSignatures
+ Complete
Failed
- InsufficientSignatures
)
func (s BlobStatus) String() string {
@@ -24,12 +24,12 @@ func (s BlobStatus) String() string {
return "Queued"
case Encoded:
return "Encoded"
- case Certified:
- return "Certified"
+ case GatheringSignatures:
+ return "Gathering Signatures"
+ case Complete:
+ return "Complete"
case Failed:
return "Failed"
- case InsufficientSignatures:
- return "Insufficient Signatures"
default:
return "Unknown"
}
@@ -41,12 +41,12 @@ func (s BlobStatus) ToProfobuf() pb.BlobStatus {
return pb.BlobStatus_QUEUED
case Encoded:
return pb.BlobStatus_ENCODED
- case Certified:
- return pb.BlobStatus_CERTIFIED
+ case GatheringSignatures:
+ return pb.BlobStatus_GATHERING_SIGNATURES
+ case Complete:
+ return pb.BlobStatus_COMPLETE
case Failed:
return pb.BlobStatus_FAILED
- case InsufficientSignatures:
- return pb.BlobStatus_INSUFFICIENT_SIGNATURES
default:
return pb.BlobStatus_UNKNOWN
}
@@ -58,12 +58,12 @@ func BlobStatusFromProtobuf(s pb.BlobStatus) (BlobStatus, error) {
return Queued, nil
case pb.BlobStatus_ENCODED:
return Encoded, nil
- case pb.BlobStatus_CERTIFIED:
- return Certified, nil
+ case pb.BlobStatus_GATHERING_SIGNATURES:
+ return GatheringSignatures, nil
+ case pb.BlobStatus_COMPLETE:
+ return Complete, nil
case pb.BlobStatus_FAILED:
return Failed, nil
- case pb.BlobStatus_INSUFFICIENT_SIGNATURES:
- return InsufficientSignatures, nil
default:
return 0, fmt.Errorf("unknown blob status: %v", s)
}
diff --git a/disperser/common/v2/blobstore/dynamo_metadata_store.go b/disperser/common/v2/blobstore/dynamo_metadata_store.go
index 793f4bddcf..776f4b0c80 100644
--- a/disperser/common/v2/blobstore/dynamo_metadata_store.go
+++ b/disperser/common/v2/blobstore/dynamo_metadata_store.go
@@ -59,11 +59,12 @@ const (
var (
statusUpdatePrecondition = map[v2.BlobStatus][]v2.BlobStatus{
- v2.Queued: {},
- v2.Encoded: {v2.Queued},
- v2.Certified: {v2.Encoded},
- v2.Failed: {v2.Queued, v2.Encoded},
- v2.InsufficientSignatures: {v2.Encoded},
+ v2.Queued: {},
+ v2.Encoded: {v2.Queued},
+ v2.GatheringSignatures: {v2.Encoded},
+ // TODO: when GatheringSignatures is fully supported, remove v2.Encoded from below
+ v2.Complete: {v2.Encoded, v2.GatheringSignatures},
+ v2.Failed: {v2.Queued, v2.Encoded, v2.GatheringSignatures},
}
ErrInvalidStateTransition = errors.New("invalid state transition")
)
diff --git a/disperser/common/v2/blobstore/dynamo_metadata_store_test.go b/disperser/common/v2/blobstore/dynamo_metadata_store_test.go
index d01110c35b..456caa83f5 100644
--- a/disperser/common/v2/blobstore/dynamo_metadata_store_test.go
+++ b/disperser/common/v2/blobstore/dynamo_metadata_store_test.go
@@ -201,7 +201,7 @@ func TestBlobMetadataStoreOperations(t *testing.T) {
metadata2 := &v2.BlobMetadata{
BlobHeader: blobHeader2,
Signature: []byte{4, 5, 6},
- BlobStatus: v2.Certified,
+ BlobStatus: v2.Complete,
Expiry: uint64(now.Add(time.Hour).Unix()),
NumRetries: 0,
UpdatedAt: uint64(now.UnixNano()),
@@ -227,10 +227,10 @@ func TestBlobMetadataStoreOperations(t *testing.T) {
assert.NoError(t, err)
assert.Len(t, queued, 0)
- certified, err := blobMetadataStore.GetBlobMetadataByStatus(ctx, v2.Certified, 0)
+ complete, err := blobMetadataStore.GetBlobMetadataByStatus(ctx, v2.Complete, 0)
assert.NoError(t, err)
- assert.Len(t, certified, 1)
- assert.Equal(t, metadata2, certified[0])
+ assert.Len(t, complete, 1)
+ assert.Equal(t, metadata2, complete[0])
queuedCount, err := blobMetadataStore.GetBlobMetadataCountByStatus(ctx, v2.Queued)
assert.NoError(t, err)
@@ -914,7 +914,7 @@ func TestBlobMetadataStoreUpdateBlobStatus(t *testing.T) {
assert.NoError(t, err)
// Update the blob status to invalid status
- err = blobMetadataStore.UpdateBlobStatus(ctx, blobKey, v2.Certified)
+ err = blobMetadataStore.UpdateBlobStatus(ctx, blobKey, v2.Complete)
assert.ErrorIs(t, err, blobstore.ErrInvalidStateTransition)
// Update the blob status to a valid status
diff --git a/disperser/controller/dispatcher.go b/disperser/controller/dispatcher.go
index c584d55d08..a653eed600 100644
--- a/disperser/controller/dispatcher.go
+++ b/disperser/controller/dispatcher.go
@@ -148,7 +148,7 @@ func (d *Dispatcher) HandleBatch(ctx context.Context) (chan core.SigningMessage,
for opID, op := range state.IndexedOperators {
opID := opID
op := op
- host, _, _, v2DispersalPort, err := core.ParseOperatorSocket(op.Socket)
+ host, _, _, v2DispersalPort, _, err := core.ParseOperatorSocket(op.Socket)
if err != nil {
return nil, nil, fmt.Errorf("failed to parse operator socket (%s): %w", op.Socket, err)
}
@@ -560,24 +560,24 @@ func (d *Dispatcher) updateBatchStatus(ctx context.Context, batch *batchData, qu
}
if failed {
- err := d.blobMetadataStore.UpdateBlobStatus(ctx, blobKey, v2.InsufficientSignatures)
+ err := d.blobMetadataStore.UpdateBlobStatus(ctx, blobKey, v2.Failed)
if err != nil {
multierr = multierror.Append(multierr, fmt.Errorf("failed to update blob status for blob %s to failed: %w", blobKey.Hex(), err))
}
if metadata, ok := batch.Metadata[blobKey]; ok {
- d.metrics.reportCompletedBlob(int(metadata.BlobSize), v2.InsufficientSignatures)
+ d.metrics.reportCompletedBlob(int(metadata.BlobSize), v2.Failed)
}
continue
}
- err := d.blobMetadataStore.UpdateBlobStatus(ctx, blobKey, v2.Certified)
+ err := d.blobMetadataStore.UpdateBlobStatus(ctx, blobKey, v2.Complete)
if err != nil {
- multierr = multierror.Append(multierr, fmt.Errorf("failed to update blob status for blob %s to certified: %w", blobKey.Hex(), err))
+ multierr = multierror.Append(multierr, fmt.Errorf("failed to update blob status for blob %s to complete: %w", blobKey.Hex(), err))
}
if metadata, ok := batch.Metadata[blobKey]; ok {
requestedAt := time.Unix(0, int64(metadata.RequestedAt))
d.metrics.reportE2EDispersalLatency(time.Since(requestedAt))
- d.metrics.reportCompletedBlob(int(metadata.BlobSize), v2.Certified)
+ d.metrics.reportCompletedBlob(int(metadata.BlobSize), v2.Complete)
}
}
diff --git a/disperser/controller/dispatcher_metrics.go b/disperser/controller/dispatcher_metrics.go
index 1c6aaf34d5..d27f58cbd6 100644
--- a/disperser/controller/dispatcher_metrics.go
+++ b/disperser/controller/dispatcher_metrics.go
@@ -368,15 +368,12 @@ func (m *dispatcherMetrics) reportE2EDispersalLatency(duration time.Duration) {
func (m *dispatcherMetrics) reportCompletedBlob(size int, status dispv2.BlobStatus) {
switch status {
- case dispv2.Certified:
- m.completedBlobs.WithLabelValues("certified", "number").Inc()
- m.completedBlobs.WithLabelValues("certified", "size").Add(float64(size))
+ case dispv2.Complete:
+ m.completedBlobs.WithLabelValues("complete", "number").Inc()
+ m.completedBlobs.WithLabelValues("complete", "size").Add(float64(size))
case dispv2.Failed:
m.completedBlobs.WithLabelValues("failed", "number").Inc()
m.completedBlobs.WithLabelValues("failed", "size").Add(float64(size))
- case dispv2.InsufficientSignatures:
- m.completedBlobs.WithLabelValues("insufficient_signature", "number").Inc()
- m.completedBlobs.WithLabelValues("insufficient_signature", "size").Add(float64(size))
default:
return
}
diff --git a/disperser/controller/dispatcher_test.go b/disperser/controller/dispatcher_test.go
index 6a499c84dd..183ced987c 100644
--- a/disperser/controller/dispatcher_test.go
+++ b/disperser/controller/dispatcher_test.go
@@ -98,10 +98,10 @@ func TestDispatcherHandleBatch(t *testing.T) {
// Test that the blob metadata status are updated
bm0, err := components.BlobMetadataStore.GetBlobMetadata(ctx, objs.blobKeys[0])
require.NoError(t, err)
- require.Equal(t, v2.Certified, bm0.BlobStatus)
+ require.Equal(t, v2.Complete, bm0.BlobStatus)
bm1, err := components.BlobMetadataStore.GetBlobMetadata(ctx, objs.blobKeys[1])
require.NoError(t, err)
- require.Equal(t, v2.Certified, bm1.BlobStatus)
+ require.Equal(t, v2.Complete, bm1.BlobStatus)
// Get batch header
vis, err := components.BlobMetadataStore.GetBlobInclusionInfos(ctx, objs.blobKeys[0])
@@ -173,12 +173,12 @@ func TestDispatcherInsufficientSignatures(t *testing.T) {
for _, blobKey := range failedObjs.blobKeys {
bm, err := components.BlobMetadataStore.GetBlobMetadata(ctx, blobKey)
require.NoError(t, err)
- require.Equal(t, v2.InsufficientSignatures, bm.BlobStatus)
+ require.Equal(t, v2.Failed, bm.BlobStatus)
}
for _, blobKey := range successfulObjs.blobKeys {
bm, err := components.BlobMetadataStore.GetBlobMetadata(ctx, blobKey)
require.NoError(t, err)
- require.Equal(t, v2.Certified, bm.BlobStatus)
+ require.Equal(t, v2.Complete, bm.BlobStatus)
}
// Get batch header
@@ -245,12 +245,12 @@ func TestDispatcherInsufficientSignatures2(t *testing.T) {
for _, blobKey := range objsInBothQuorum.blobKeys {
bm, err := components.BlobMetadataStore.GetBlobMetadata(ctx, blobKey)
require.NoError(t, err)
- require.Equal(t, v2.InsufficientSignatures, bm.BlobStatus)
+ require.Equal(t, v2.Failed, bm.BlobStatus)
}
for _, blobKey := range objsInQuorum1.blobKeys {
bm, err := components.BlobMetadataStore.GetBlobMetadata(ctx, blobKey)
require.NoError(t, err)
- require.Equal(t, v2.InsufficientSignatures, bm.BlobStatus)
+ require.Equal(t, v2.Failed, bm.BlobStatus)
}
// Get batch header
diff --git a/disperser/dataapi/docs/v1/V1_docs.go b/disperser/dataapi/docs/v1/V1_docs.go
index 73e7cbf182..f12eef0b3e 100644
--- a/disperser/dataapi/docs/v1/V1_docs.go
+++ b/disperser/dataapi/docs/v1/V1_docs.go
@@ -1141,6 +1141,12 @@ const docTemplateV1 = `{
"items": {
"type": "integer"
}
+ },
+ "y": {
+ "type": "array",
+ "items": {
+ "type": "integer"
+ }
}
}
},
@@ -1149,6 +1155,9 @@ const docTemplateV1 = `{
"properties": {
"x": {
"$ref": "#/definitions/github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2"
+ },
+ "y": {
+ "$ref": "#/definitions/github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2"
}
}
},
@@ -1157,6 +1166,9 @@ const docTemplateV1 = `{
"properties": {
"x": {
"$ref": "#/definitions/github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2"
+ },
+ "y": {
+ "$ref": "#/definitions/github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2"
}
}
},
@@ -1187,6 +1199,12 @@ const docTemplateV1 = `{
"items": {
"type": "integer"
}
+ },
+ "a1": {
+ "type": "array",
+ "items": {
+ "type": "integer"
+ }
}
}
},
diff --git a/disperser/dataapi/docs/v1/V1_swagger.json b/disperser/dataapi/docs/v1/V1_swagger.json
index 5f1ae1cc37..3e7c93d39b 100644
--- a/disperser/dataapi/docs/v1/V1_swagger.json
+++ b/disperser/dataapi/docs/v1/V1_swagger.json
@@ -1137,6 +1137,12 @@
"items": {
"type": "integer"
}
+ },
+ "y": {
+ "type": "array",
+ "items": {
+ "type": "integer"
+ }
}
}
},
@@ -1145,6 +1151,9 @@
"properties": {
"x": {
"$ref": "#/definitions/github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2"
+ },
+ "y": {
+ "$ref": "#/definitions/github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2"
}
}
},
@@ -1153,6 +1162,9 @@
"properties": {
"x": {
"$ref": "#/definitions/github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2"
+ },
+ "y": {
+ "$ref": "#/definitions/github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2"
}
}
},
@@ -1183,6 +1195,12 @@
"items": {
"type": "integer"
}
+ },
+ "a1": {
+ "type": "array",
+ "items": {
+ "type": "integer"
+ }
}
}
},
diff --git a/disperser/dataapi/docs/v1/V1_swagger.yaml b/disperser/dataapi/docs/v1/V1_swagger.yaml
index b845673e11..c76b46042d 100644
--- a/disperser/dataapi/docs/v1/V1_swagger.yaml
+++ b/disperser/dataapi/docs/v1/V1_swagger.yaml
@@ -263,16 +263,24 @@ definitions:
items:
type: integer
type: array
+ "y":
+ items:
+ type: integer
+ type: array
type: object
encoding.G2Commitment:
properties:
x:
$ref: '#/definitions/github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2'
+ "y":
+ $ref: '#/definitions/github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2'
type: object
encoding.LengthProof:
properties:
x:
$ref: '#/definitions/github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2'
+ "y":
+ $ref: '#/definitions/github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2'
type: object
github_com_Layr-Labs_eigenda_disperser.BlobStatus:
enum:
@@ -296,6 +304,10 @@ definitions:
items:
type: integer
type: array
+ a1:
+ items:
+ type: integer
+ type: array
type: object
semver.SemverMetrics:
properties:
diff --git a/disperser/dataapi/docs/v2/V2_docs.go b/disperser/dataapi/docs/v2/V2_docs.go
index 7995fda7f2..9f59e14556 100644
--- a/disperser/dataapi/docs/v2/V2_docs.go
+++ b/disperser/dataapi/docs/v2/V2_docs.go
@@ -663,6 +663,12 @@ const docTemplateV2 = `{
"items": {
"type": "integer"
}
+ },
+ "y": {
+ "type": "array",
+ "items": {
+ "type": "integer"
+ }
}
}
},
@@ -671,6 +677,9 @@ const docTemplateV2 = `{
"properties": {
"x": {
"$ref": "#/definitions/github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2"
+ },
+ "y": {
+ "$ref": "#/definitions/github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2"
}
}
},
@@ -703,6 +712,12 @@ const docTemplateV2 = `{
"items": {
"type": "integer"
}
+ },
+ "y": {
+ "type": "array",
+ "items": {
+ "type": "integer"
+ }
}
}
},
@@ -731,6 +746,12 @@ const docTemplateV2 = `{
"items": {
"type": "integer"
}
+ },
+ "y": {
+ "type": "array",
+ "items": {
+ "type": "integer"
+ }
}
}
},
@@ -739,6 +760,9 @@ const docTemplateV2 = `{
"properties": {
"x": {
"$ref": "#/definitions/github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2"
+ },
+ "y": {
+ "$ref": "#/definitions/github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2"
}
}
},
@@ -747,6 +771,9 @@ const docTemplateV2 = `{
"properties": {
"x": {
"$ref": "#/definitions/github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2"
+ },
+ "y": {
+ "$ref": "#/definitions/github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2"
}
}
},
@@ -925,9 +952,9 @@ const docTemplateV2 = `{
"x-enum-varnames": [
"Queued",
"Encoded",
- "Certified",
- "Failed",
- "InsufficientSignatures"
+ "GatheringSignatures",
+ "Complete",
+ "Failed"
]
},
"github_com_Layr-Labs_eigenda_disperser_dataapi_v2.SignedBatch": {
@@ -949,6 +976,12 @@ const docTemplateV2 = `{
"items": {
"type": "integer"
}
+ },
+ "a1": {
+ "type": "array",
+ "items": {
+ "type": "integer"
+ }
}
}
},
diff --git a/disperser/dataapi/docs/v2/V2_swagger.json b/disperser/dataapi/docs/v2/V2_swagger.json
index b36b6ba2d1..aaf1d3ff33 100644
--- a/disperser/dataapi/docs/v2/V2_swagger.json
+++ b/disperser/dataapi/docs/v2/V2_swagger.json
@@ -660,6 +660,12 @@
"items": {
"type": "integer"
}
+ },
+ "y": {
+ "type": "array",
+ "items": {
+ "type": "integer"
+ }
}
}
},
@@ -668,6 +674,9 @@
"properties": {
"x": {
"$ref": "#/definitions/github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2"
+ },
+ "y": {
+ "$ref": "#/definitions/github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2"
}
}
},
@@ -700,6 +709,12 @@
"items": {
"type": "integer"
}
+ },
+ "y": {
+ "type": "array",
+ "items": {
+ "type": "integer"
+ }
}
}
},
@@ -728,6 +743,12 @@
"items": {
"type": "integer"
}
+ },
+ "y": {
+ "type": "array",
+ "items": {
+ "type": "integer"
+ }
}
}
},
@@ -736,6 +757,9 @@
"properties": {
"x": {
"$ref": "#/definitions/github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2"
+ },
+ "y": {
+ "$ref": "#/definitions/github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2"
}
}
},
@@ -744,6 +768,9 @@
"properties": {
"x": {
"$ref": "#/definitions/github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2"
+ },
+ "y": {
+ "$ref": "#/definitions/github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2"
}
}
},
@@ -922,9 +949,9 @@
"x-enum-varnames": [
"Queued",
"Encoded",
- "Certified",
- "Failed",
- "InsufficientSignatures"
+ "GatheringSignatures",
+ "Complete",
+ "Failed"
]
},
"github_com_Layr-Labs_eigenda_disperser_dataapi_v2.SignedBatch": {
@@ -946,6 +973,12 @@
"items": {
"type": "integer"
}
+ },
+ "a1": {
+ "type": "array",
+ "items": {
+ "type": "integer"
+ }
}
}
},
diff --git a/disperser/dataapi/docs/v2/V2_swagger.yaml b/disperser/dataapi/docs/v2/V2_swagger.yaml
index 3fdaf7fa61..c06ebdb8c2 100644
--- a/disperser/dataapi/docs/v2/V2_swagger.yaml
+++ b/disperser/dataapi/docs/v2/V2_swagger.yaml
@@ -51,11 +51,17 @@ definitions:
items:
type: integer
type: array
+ "y":
+ items:
+ type: integer
+ type: array
type: object
core.G2Point:
properties:
x:
$ref: '#/definitions/github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2'
+ "y":
+ $ref: '#/definitions/github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2'
type: object
core.PaymentMetadata:
properties:
@@ -78,6 +84,10 @@ definitions:
items:
type: integer
type: array
+ "y":
+ items:
+ type: integer
+ type: array
type: object
encoding.BlobCommitments:
properties:
@@ -96,16 +106,24 @@ definitions:
items:
type: integer
type: array
+ "y":
+ items:
+ type: integer
+ type: array
type: object
encoding.G2Commitment:
properties:
x:
$ref: '#/definitions/github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2'
+ "y":
+ $ref: '#/definitions/github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2'
type: object
encoding.LengthProof:
properties:
x:
$ref: '#/definitions/github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2'
+ "y":
+ $ref: '#/definitions/github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2'
type: object
github_com_Layr-Labs_eigenda_core_v2.Attestation:
properties:
@@ -238,9 +256,9 @@ definitions:
x-enum-varnames:
- Queued
- Encoded
- - Certified
+ - GatheringSignatures
+ - Complete
- Failed
- - InsufficientSignatures
github_com_Layr-Labs_eigenda_disperser_dataapi_v2.SignedBatch:
properties:
attestation:
@@ -254,6 +272,10 @@ definitions:
items:
type: integer
type: array
+ a1:
+ items:
+ type: integer
+ type: array
type: object
semver.SemverMetrics:
properties:
diff --git a/disperser/dataapi/nonsigner_handler.go b/disperser/dataapi/nonsigner_handler.go
index 7eac596abb..609c629dd4 100644
--- a/disperser/dataapi/nonsigner_handler.go
+++ b/disperser/dataapi/nonsigner_handler.go
@@ -200,7 +200,7 @@ func computeNumFailed(batches []*BatchNonSigningInfo, operatorQuorumIntervals Op
func computeNumResponsible(batches []*BatchNonSigningInfo, operatorQuorumIntervals OperatorQuorumIntervals) map[string]map[uint8]int {
// Create quorumBatches, where quorumBatches[q].AccuBatches is the total number of
// batches in block interval [startBlock, b] for quorum "q".
- quorumBatches := CreatQuorumBatches(batches)
+ quorumBatches := CreatQuorumBatches(CreateQuorumBatchMap(batches))
numResponsible := make(map[string]map[uint8]int)
for op, val := range operatorQuorumIntervals {
diff --git a/disperser/dataapi/nonsigner_utils.go b/disperser/dataapi/nonsigner_utils.go
index 0fcf020f0f..0a8ef2217e 100644
--- a/disperser/dataapi/nonsigner_utils.go
+++ b/disperser/dataapi/nonsigner_utils.go
@@ -209,9 +209,9 @@ func ComputeNumBatches(quorumBatches *QuorumBatches, startBlock, endBlock uint32
return num
}
-// CreatQuorumBatches returns quorumBatches, where quorumBatches[q] is a list of
-// QuorumBatches in ascending order by block number.
-func CreatQuorumBatches(batches []*BatchNonSigningInfo) map[uint8]*QuorumBatches {
+// CreateQuorumBatchMap returns quorumBatchMap, where quorumBatchMap[q][b] means the number of
+// batches at block b that have dispersed to quorum q.
+func CreateQuorumBatchMap(batches []*BatchNonSigningInfo) map[uint8]map[uint32]int {
quorumBatchMap := make(map[uint8]map[uint32]int)
for _, batch := range batches {
for _, q := range batch.QuorumNumbers {
@@ -221,6 +221,12 @@ func CreatQuorumBatches(batches []*BatchNonSigningInfo) map[uint8]*QuorumBatches
quorumBatchMap[q][batch.ReferenceBlockNumber]++
}
}
+ return quorumBatchMap
+}
+
+// CreatQuorumBatches returns quorumBatches, where quorumBatches[q] is a list of
+// QuorumBatches in ascending order by block number.
+func CreatQuorumBatches(quorumBatchMap map[uint8]map[uint32]int) map[uint8]*QuorumBatches {
quorumBatches := make(map[uint8]*QuorumBatches)
for q, s := range quorumBatchMap {
numBatches := make([]*NumBatchesAtBlock, 0)
diff --git a/disperser/dataapi/nonsigner_utils_test.go b/disperser/dataapi/nonsigner_utils_test.go
index 0a0283628e..cad6d5a330 100644
--- a/disperser/dataapi/nonsigner_utils_test.go
+++ b/disperser/dataapi/nonsigner_utils_test.go
@@ -461,7 +461,7 @@ func TestCreatQuorumBatches(t *testing.T) {
},
}
- quorumBatches := dataapi.CreatQuorumBatches(batchNonSigningInfo)
+ quorumBatches := dataapi.CreatQuorumBatches(dataapi.CreateQuorumBatchMap(batchNonSigningInfo))
assert.Equal(t, 3, len(quorumBatches))
diff --git a/disperser/dataapi/operator_handler.go b/disperser/dataapi/operator_handler.go
index 94f70cd3a1..47927db7f4 100644
--- a/disperser/dataapi/operator_handler.go
+++ b/disperser/dataapi/operator_handler.go
@@ -92,11 +92,11 @@ func (oh *OperatorHandler) ProbeOperatorHosts(ctx context.Context, operatorId st
}
operatorSocket := core.OperatorSocket(operatorInfo.Socket)
- retrievalSocket := operatorSocket.GetRetrievalSocket()
- retrievalPortOpen := checkIsOperatorPortOpen(retrievalSocket, 3, oh.logger)
- retrievalOnline, retrievalStatus := false, "port closed or unreachable"
- if retrievalPortOpen {
- retrievalOnline, retrievalStatus = checkServiceOnline(ctx, "node.Retrieval", retrievalSocket, 3*time.Second)
+ v1RetrievalSocket := operatorSocket.GetV1RetrievalSocket()
+ v1RetrievalPortOpen := checkIsOperatorPortOpen(v1RetrievalSocket, 3, oh.logger)
+ v1RetrievalOnline, v1RetrievalStatus := false, "port closed or unreachable"
+ if v1RetrievalPortOpen {
+ v1RetrievalOnline, v1RetrievalStatus = checkServiceOnline(ctx, "node.Retrieval", v1RetrievalSocket, 3*time.Second)
}
v1DispersalSocket := operatorSocket.GetV1DispersalSocket()
@@ -119,6 +119,19 @@ func (oh *OperatorHandler) ProbeOperatorHosts(ctx context.Context, operatorId st
}
}
+ v2RetrievalOnline, v2RetrievalStatus := false, ""
+ v2RetrievalSocket := operatorSocket.GetV2RetrievalSocket()
+ if v2RetrievalSocket == "" {
+ v2RetrievalStatus = "v2 retrieval port is not registered"
+ } else {
+ v2RetrievalPortOpen := checkIsOperatorPortOpen(v2RetrievalSocket, 3, oh.logger)
+ if !v2RetrievalPortOpen {
+ v2RetrievalStatus = "port closed or unreachable"
+ } else {
+ v2RetrievalOnline, v2RetrievalStatus = checkServiceOnline(ctx, "node.v2.Retrieval", v2RetrievalSocket, 3*time.Second)
+ }
+ }
+
// Create the metadata regardless of online status
portCheckResponse := &OperatorPortCheckResponse{
OperatorId: operatorId,
@@ -128,9 +141,12 @@ func (oh *OperatorHandler) ProbeOperatorHosts(ctx context.Context, operatorId st
V2DispersalSocket: v2DispersalSocket,
V2DispersalOnline: v2DispersalOnline,
V2DispersalStatus: v2DispersalStatus,
- RetrievalSocket: retrievalSocket,
- RetrievalOnline: retrievalOnline,
- RetrievalStatus: retrievalStatus,
+ RetrievalSocket: v1RetrievalSocket,
+ RetrievalOnline: v1RetrievalOnline,
+ RetrievalStatus: v1RetrievalStatus,
+ V2RetrievalSocket: v2RetrievalSocket,
+ V2RetrievalOnline: v2RetrievalOnline,
+ V2RetrievalStatus: v2RetrievalStatus,
}
// Log the online status
diff --git a/disperser/dataapi/queried_operators_handlers.go b/disperser/dataapi/queried_operators_handlers.go
index f905328e4c..b282f0ff10 100644
--- a/disperser/dataapi/queried_operators_handlers.go
+++ b/disperser/dataapi/queried_operators_handlers.go
@@ -198,7 +198,7 @@ func checkIsOnlineAndProcessOperator(operatorStatus OperatorOnlineStatus, operat
var isOnline bool
var socket string
if operatorStatus.IndexedOperatorInfo != nil {
- socket = core.OperatorSocket(operatorStatus.IndexedOperatorInfo.Socket).GetRetrievalSocket()
+ socket = core.OperatorSocket(operatorStatus.IndexedOperatorInfo.Socket).GetV1RetrievalSocket()
isOnline = checkIsOperatorPortOpen(socket, 10, logger)
}
diff --git a/disperser/dataapi/server.go b/disperser/dataapi/server.go
index c28a6ead7f..2875699061 100644
--- a/disperser/dataapi/server.go
+++ b/disperser/dataapi/server.go
@@ -185,6 +185,9 @@ type (
V2DispersalSocket string `json:"v2_dispersal_socket"`
V2DispersalOnline bool `json:"v2_dispersal_online"`
V2DispersalStatus string `json:"v2_dispersal_status"`
+ V2RetrievalSocket string `json:"v2_retrieval_socket"`
+ V2RetrievalOnline bool `json:"v2_retrieval_online"`
+ V2RetrievalStatus string `json:"v2_retrieval_status"`
}
SemverReportResponse struct {
Semver map[string]*semver.SemverMetrics `json:"semver"`
diff --git a/disperser/dataapi/subgraph_client_test.go b/disperser/dataapi/subgraph_client_test.go
index 9f7e4a3322..4b4e81cf3b 100644
--- a/disperser/dataapi/subgraph_client_test.go
+++ b/disperser/dataapi/subgraph_client_test.go
@@ -330,7 +330,7 @@ var (
},
SocketUpdates: []subgraph.SocketUpdates{
{
- Socket: "localhost:32008;32009;32010",
+ Socket: "localhost:32008;32009;32010;32011",
},
},
}
diff --git a/go.mod b/go.mod
index 46ecc44b2b..43ce8f8edf 100644
--- a/go.mod
+++ b/go.mod
@@ -11,6 +11,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/kms v1.31.0
github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.28.6
github.com/consensys/gnark-crypto v0.12.1
+ github.com/docker/go-units v0.5.0
github.com/emirpasic/gods v1.18.1
github.com/ethereum/go-ethereum v1.14.8
github.com/fxamacker/cbor/v2 v2.5.0
@@ -85,7 +86,6 @@ require (
github.com/docker/cli v25.0.3+incompatible // indirect
github.com/docker/docker v25.0.6+incompatible // indirect
github.com/docker/go-connections v0.5.0 // indirect
- github.com/docker/go-units v0.5.0 // indirect
github.com/ethereum/c-kzg-4844 v1.0.0 // indirect
github.com/ethereum/go-verkle v0.1.1-0.20240306133620-7d920df305f0 // indirect
github.com/gabriel-vasile/mimetype v1.4.2 // indirect
diff --git a/inabox/deploy/config.go b/inabox/deploy/config.go
index 299c99321b..37cfe72bbb 100644
--- a/inabox/deploy/config.go
+++ b/inabox/deploy/config.go
@@ -385,7 +385,7 @@ func (env *Config) generateRelayVars(ind int, graphUrl, grpcPort string) RelayVa
}
// Generates DA node .env
-func (env *Config) generateOperatorVars(ind int, name, key, churnerUrl, logPath, dbPath, dispersalPort, retrievalPort, v2DispersalPort, metricsPort, nodeApiPort string) OperatorVars {
+func (env *Config) generateOperatorVars(ind int, name, key, churnerUrl, logPath, dbPath, dispersalPort, retrievalPort, v2DispersalPort, v2RetrievalPort, metricsPort, nodeApiPort string) OperatorVars {
max, _ := new(big.Int).SetString("21888242871839275222246405745257275088548364400416034343698204186575808495617", 10)
// max.Exp(big.NewInt(2), big.NewInt(130), nil).Sub(max, big.NewInt(1))
@@ -412,6 +412,7 @@ func (env *Config) generateOperatorVars(ind int, name, key, churnerUrl, logPath,
NODE_INTERNAL_DISPERSAL_PORT: dispersalPort,
NODE_INTERNAL_RETRIEVAL_PORT: retrievalPort,
NODE_V2_DISPERSAL_PORT: v2DispersalPort,
+ NODE_V2_RETRIEVAL_PORT: v2RetrievalPort,
NODE_ENABLE_METRICS: "true",
NODE_METRICS_PORT: metricsPort,
NODE_ENABLE_NODE_API: "true",
@@ -653,8 +654,9 @@ func (env *Config) GenerateAllVariables() {
dispersalPort := fmt.Sprint(port + 2)
retrievalPort := fmt.Sprint(port + 3)
v2DispersalPort := fmt.Sprint(port + 4)
- nodeApiPort := fmt.Sprint(port + 5)
- port += 6
+ v2RetrievalPort := fmt.Sprint(port + 5)
+ nodeApiPort := fmt.Sprint(port + 6)
+ port += 7
name := fmt.Sprintf("opr%v", i)
logPath, dbPath, filename, envFile := env.getPaths(name)
@@ -662,7 +664,7 @@ func (env *Config) GenerateAllVariables() {
// Convert key to address
- operatorConfig := env.generateOperatorVars(i, name, key, churnerUrl, logPath, dbPath, dispersalPort, retrievalPort, v2DispersalPort, fmt.Sprint(metricsPort), nodeApiPort)
+ operatorConfig := env.generateOperatorVars(i, name, key, churnerUrl, logPath, dbPath, dispersalPort, retrievalPort, v2DispersalPort, v2RetrievalPort, fmt.Sprint(metricsPort), nodeApiPort)
writeEnv(operatorConfig.getEnvMap(), envFile)
env.Operators = append(env.Operators, operatorConfig)
diff --git a/inabox/deploy/deploy.go b/inabox/deploy/deploy.go
index db492e11e3..3e0c2a0e48 100644
--- a/inabox/deploy/deploy.go
+++ b/inabox/deploy/deploy.go
@@ -365,7 +365,7 @@ func (env *Config) StopAnvil() {
func (env *Config) RunNodePluginBinary(operation string, operator OperatorVars) {
changeDirectory(filepath.Join(env.rootPath, "inabox"))
- socket := string(core.MakeOperatorSocket(operator.NODE_HOSTNAME, operator.NODE_DISPERSAL_PORT, operator.NODE_RETRIEVAL_PORT, operator.NODE_V2_DISPERSAL_PORT))
+ socket := string(core.MakeOperatorSocket(operator.NODE_HOSTNAME, operator.NODE_DISPERSAL_PORT, operator.NODE_RETRIEVAL_PORT, operator.NODE_V2_DISPERSAL_PORT, operator.NODE_V2_RETRIEVAL_PORT))
envVars := []string{
"NODE_OPERATION=" + operation,
diff --git a/inabox/deploy/env_vars.go b/inabox/deploy/env_vars.go
index ea2f94c694..252f27ded8 100644
--- a/inabox/deploy/env_vars.go
+++ b/inabox/deploy/env_vars.go
@@ -346,6 +346,8 @@ type OperatorVars struct {
NODE_V2_DISPERSAL_PORT string
+ NODE_V2_RETRIEVAL_PORT string
+
NODE_ENABLE_METRICS string
NODE_METRICS_PORT string
diff --git a/inabox/tests/integration_v2_test.go b/inabox/tests/integration_v2_test.go
index 558467c6d2..1487ebd54a 100644
--- a/inabox/tests/integration_v2_test.go
+++ b/inabox/tests/integration_v2_test.go
@@ -32,7 +32,8 @@ var _ = Describe("Inabox v2 Integration", func() {
defer cancel()
privateKeyHex := "0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcded"
- signer := auth.NewLocalBlobRequestSigner(privateKeyHex)
+ signer, err := auth.NewLocalBlobRequestSigner(privateKeyHex)
+ Expect(err).To(BeNil())
disp, err := clients.NewDisperserClient(&clients.DisperserClientConfig{
Hostname: "localhost",
@@ -93,7 +94,7 @@ var _ = Describe("Inabox v2 Integration", func() {
status2, err := dispv2.BlobStatusFromProtobuf(reply2.GetStatus())
Expect(err).To(BeNil())
- if status1 != dispv2.Certified || status2 != dispv2.Certified {
+ if status1 != dispv2.Complete || status2 != dispv2.Complete {
continue
}
diff --git a/node/config.go b/node/config.go
index c1580fa52c..174eca0473 100644
--- a/node/config.go
+++ b/node/config.go
@@ -49,6 +49,7 @@ type Config struct {
InternalRetrievalPort string
InternalDispersalPort string
V2DispersalPort string
+ V2RetrievalPort string
EnableNodeApi bool
NodeApiPort string
EnableMetrics bool
@@ -233,13 +234,13 @@ func NewConfig(ctx *cli.Context) (*Config, error) {
// check if the ports are valid integers
dispersalPort := ctx.GlobalString(flags.DispersalPortFlag.Name)
- _, err = strconv.Atoi(dispersalPort)
+ err = core.ValidatePort(dispersalPort)
if err != nil {
return nil, fmt.Errorf("invalid dispersal port: %s", dispersalPort)
}
retrievalPort := ctx.GlobalString(flags.RetrievalPortFlag.Name)
- _, err = strconv.Atoi(retrievalPort)
+ err = core.ValidatePort(retrievalPort)
if err != nil {
return nil, fmt.Errorf("invalid retrieval port: %s", retrievalPort)
}
@@ -252,12 +253,36 @@ func NewConfig(ctx *cli.Context) (*Config, error) {
return nil, fmt.Errorf("v2 dispersal port (NODE_V2_DISPERSAL_PORT) must be specified if v2 is enabled")
}
} else {
- _, err = strconv.Atoi(v2DispersalPort)
+ if !v2Enabled {
+ return nil, fmt.Errorf("enable v2 flag needs to be set when v2 dispersal port (NODE_V2_DISPERSAL_PORT) is specified")
+ }
+ if v2DispersalPort == dispersalPort {
+ return nil, fmt.Errorf("ensure to v1 and v2 dispersal ports are not the same")
+ }
+ err = core.ValidatePort(v2DispersalPort)
if err != nil {
return nil, fmt.Errorf("invalid v2 dispersal port: %s", v2DispersalPort)
}
}
+ v2RetrievalPort := ctx.GlobalString(flags.V2RetrievalPortFlag.Name)
+ if v2RetrievalPort == "" {
+ if v2Enabled {
+ return nil, fmt.Errorf("v2 retrieval port (NODE_V2_RETRIEVAL_PORT) must be specified if v2 is enabled")
+ }
+ } else {
+ if !v2Enabled {
+ return nil, fmt.Errorf("enable v2 flag needs to be set when v2 retrieval port (NODE_V2_RETRIEVAL_PORT) is specified")
+ }
+ if v2RetrievalPort == retrievalPort {
+ return nil, fmt.Errorf("ensure to v1 and v2 retrieval ports are not the same")
+ }
+ err = core.ValidatePort(v2RetrievalPort)
+ if err != nil {
+ return nil, fmt.Errorf("invalid v2 retrieval port: %s", v2RetrievalPort)
+ }
+ }
+
return &Config{
Hostname: ctx.GlobalString(flags.HostnameFlag.Name),
DispersalPort: dispersalPort,
@@ -265,6 +290,7 @@ func NewConfig(ctx *cli.Context) (*Config, error) {
InternalDispersalPort: internalDispersalFlag,
InternalRetrievalPort: internalRetrievalFlag,
V2DispersalPort: v2DispersalPort,
+ V2RetrievalPort: v2RetrievalPort,
EnableNodeApi: ctx.GlobalBool(flags.EnableNodeApiFlag.Name),
NodeApiPort: ctx.GlobalString(flags.NodeApiPortFlag.Name),
EnableMetrics: ctx.GlobalBool(flags.EnableMetricsFlag.Name),
diff --git a/node/flags/flags.go b/node/flags/flags.go
index da61e9aaff..40340f40d7 100644
--- a/node/flags/flags.go
+++ b/node/flags/flags.go
@@ -54,6 +54,12 @@ var (
Required: true,
EnvVar: common.PrefixEnvVar(EnvVarPrefix, "V2_DISPERSAL_PORT"),
}
+ V2RetrievalPortFlag = cli.StringFlag{
+ Name: common.PrefixFlag(FlagPrefix, "v2-retrieval-port"),
+ Usage: "Port at which node registers to listen for v2 retrieval calls",
+ Required: true,
+ EnvVar: common.PrefixEnvVar(EnvVarPrefix, "V2_RETRIEVAL_PORT"),
+ }
EnableNodeApiFlag = cli.BoolFlag{
Name: common.PrefixFlag(FlagPrefix, "enable-node-api"),
Usage: "enable node-api to serve eigenlayer-cli node-api calls",
@@ -438,6 +444,7 @@ var optionalFlags = []cli.Flag{
BLSSignerAPIKeyFlag,
EnableV2Flag,
V2DispersalPortFlag,
+ V2RetrievalPortFlag,
OnchainStateRefreshIntervalFlag,
ChunkDownloadTimeoutFlag,
GRPCMsgSizeLimitV2Flag,
diff --git a/node/grpc/run.go b/node/grpc/run.go
index dd888a7284..32298c5174 100644
--- a/node/grpc/run.go
+++ b/node/grpc/run.go
@@ -54,7 +54,7 @@ func RunServers(serverV1 *Server, serverV2 *ServerV2, config *node.Config, logge
// V2 dispersal service
go func() {
if !config.EnableV2 {
- logger.Warn("V2 is not enabled, skipping V2 server startup")
+ logger.Warn("V2 is not enabled, skipping V2 dispersal server startup")
return
}
for {
@@ -92,14 +92,13 @@ func RunServers(serverV1 *Server, serverV2 *ServerV2, config *node.Config, logge
}
opt := grpc.MaxRecvMsgSize(1024 * 1024 * 300) // 300 MiB
- gs := grpc.NewServer(opt, serverV2.metrics.GetGRPCServerOption())
+ gs := grpc.NewServer(opt)
// Register reflection service on gRPC server
// This makes "grpcurl -plaintext localhost:9000 list" command work
reflection.Register(gs)
pb.RegisterRetrievalServer(gs, serverV1)
- validator.RegisterRetrievalServer(gs, serverV2)
healthcheck.RegisterHealthServer("node.Retrieval", gs)
logger.Info("port", config.InternalRetrievalPort, "address", listener.Addr().String(), "GRPC Listening")
@@ -109,5 +108,34 @@ func RunServers(serverV1 *Server, serverV2 *ServerV2, config *node.Config, logge
}
}()
+ go func() {
+ if !config.EnableV2 {
+ logger.Warn("V2 is not enabled, skipping V2 retrieval server startup")
+ return
+ }
+ for {
+ addr := fmt.Sprintf("%s:%s", localhost, config.V2RetrievalPort)
+ listener, err := net.Listen("tcp", addr)
+ if err != nil {
+ logger.Fatalf("Could not start tcp listener: %v", err)
+ }
+ opt := grpc.MaxRecvMsgSize(config.GRPCMsgSizeLimitV2)
+ gs := grpc.NewServer(opt, serverV2.metrics.GetGRPCServerOption())
+
+ // Register reflection service on gRPC server
+ // This makes "grpcurl -plaintext localhost:9000 list" command work
+ reflection.Register(gs)
+
+ validator.RegisterRetrievalServer(gs, serverV2)
+
+ healthcheck.RegisterHealthServer("node.v2.Retrieval", gs)
+
+ logger.Info("port", config.V2RetrievalPort, "address", listener.Addr().String(), "GRPC Listening")
+ if err := gs.Serve(listener); err != nil {
+ logger.Error("retrieval v2 server failed; restarting.", "err", err)
+ }
+ }
+ }()
+
return nil
}
diff --git a/node/node.go b/node/node.go
index fa1f8106ad..abc1829dd1 100644
--- a/node/node.go
+++ b/node/node.go
@@ -184,7 +184,7 @@ func NewNode(
}
nodeLogger.Info("Creating node", "chainID", chainID.String(), "operatorID", config.ID.Hex(),
- "dispersalPort", config.DispersalPort, "v2DispersalPort", config.V2DispersalPort, "retrievalPort", config.RetrievalPort, "churnerUrl", config.ChurnerUrl,
+ "dispersalPort", config.DispersalPort, "v2DispersalPort", config.V2DispersalPort, "retrievalPort", config.RetrievalPort, "v2RetrievalPort", config.V2RetrievalPort, "churnerUrl", config.ChurnerUrl,
"quorumIDs", fmt.Sprint(config.QuorumIDList), "registerNodeAtStart", config.RegisterNodeAtStart, "pubIPCheckInterval", config.PubIPCheckInterval,
"eigenDAServiceManagerAddr", config.EigenDAServiceManagerAddr, "blockStaleMeasure", blockStaleMeasure, "storeDurationBlocks", storeDurationBlocks, "enableGnarkBundleEncoding", config.EnableGnarkBundleEncoding)
@@ -286,12 +286,12 @@ func (n *Node) Start(ctx context.Context) error {
}
// Build the socket based on the hostname/IP provided in the CLI
- socket := string(core.MakeOperatorSocket(n.Config.Hostname, n.Config.DispersalPort, n.Config.RetrievalPort, n.Config.V2DispersalPort))
+ socket := string(core.MakeOperatorSocket(n.Config.Hostname, n.Config.DispersalPort, n.Config.RetrievalPort, n.Config.V2DispersalPort, n.Config.V2RetrievalPort))
var operator *Operator
if n.Config.RegisterNodeAtStart {
n.Logger.Info("Registering node on chain with the following parameters:", "operatorId",
n.Config.ID.Hex(), "hostname", n.Config.Hostname, "dispersalPort", n.Config.DispersalPort, "v2DispersalPort", n.Config.V2DispersalPort,
- "retrievalPort", n.Config.RetrievalPort, "churnerUrl", n.Config.ChurnerUrl, "quorumIds", fmt.Sprint(n.Config.QuorumIDList))
+ "retrievalPort", n.Config.RetrievalPort, "v2RetrievalPort", n.Config.V2RetrievalPort, "churnerUrl", n.Config.ChurnerUrl, "quorumIds", fmt.Sprint(n.Config.QuorumIDList))
privateKey, err := crypto.HexToECDSA(n.Config.EthClientConfig.PrivateKeyString)
if err != nil {
return fmt.Errorf("NewClient: cannot parse private key: %w", err)
@@ -649,7 +649,7 @@ func (n *Node) checkCurrentNodeIp(ctx context.Context) {
case <-ctx.Done():
return
case <-t.C:
- newSocketAddr, err := SocketAddress(ctx, n.PubIPProvider, n.Config.DispersalPort, n.Config.RetrievalPort, n.Config.V2DispersalPort)
+ newSocketAddr, err := SocketAddress(ctx, n.PubIPProvider, n.Config.DispersalPort, n.Config.RetrievalPort, n.Config.V2DispersalPort, n.Config.V2RetrievalPort)
if err != nil {
n.Logger.Error("failed to get socket address", "err", err)
continue
diff --git a/node/plugin/cmd/main.go b/node/plugin/cmd/main.go
index 5d58c1ee17..f1619d7200 100644
--- a/node/plugin/cmd/main.go
+++ b/node/plugin/cmd/main.go
@@ -135,7 +135,7 @@ func pluginOps(ctx *cli.Context) {
return
}
- _, dispersalPort, retrievalPort, v2DispersalPort, err := core.ParseOperatorSocket(config.Socket)
+ _, dispersalPort, retrievalPort, v2DispersalPort, v2RetrievalPort, err := core.ParseOperatorSocket(config.Socket)
if err != nil {
log.Printf("Error: failed to parse operator socket: %v", err)
return
@@ -144,7 +144,7 @@ func pluginOps(ctx *cli.Context) {
socket := config.Socket
if isLocalhost(socket) {
pubIPProvider := pubip.ProviderOrDefault(logger, config.PubIPProvider)
- socket, err = node.SocketAddress(context.Background(), pubIPProvider, dispersalPort, retrievalPort, v2DispersalPort)
+ socket, err = node.SocketAddress(context.Background(), pubIPProvider, dispersalPort, retrievalPort, v2DispersalPort, v2RetrievalPort)
if err != nil {
log.Printf("Error: failed to get socket address from ip provider: %v", err)
return
diff --git a/node/utils.go b/node/utils.go
index ce842e6936..394ba4115b 100644
--- a/node/utils.go
+++ b/node/utils.go
@@ -121,12 +121,12 @@ func ValidatePointsFromBlobHeader(h *pb.BlobHeader) error {
return nil
}
-func SocketAddress(ctx context.Context, provider pubip.Provider, dispersalPort, retrievalPort, v2DispersalPort string) (string, error) {
+func SocketAddress(ctx context.Context, provider pubip.Provider, dispersalPort, retrievalPort, v2DispersalPort, v2RetrievalPort string) (string, error) {
ip, err := provider.PublicIPAddress(ctx)
if err != nil {
return "", fmt.Errorf("failed to get public ip address from IP provider: %w", err)
}
- socket := core.MakeOperatorSocket(ip, dispersalPort, retrievalPort, v2DispersalPort)
+ socket := core.MakeOperatorSocket(ip, dispersalPort, retrievalPort, v2DispersalPort, v2RetrievalPort)
return socket.String(), nil
}
diff --git a/test/integration_test.go b/test/integration_test.go
index 1299fec53a..aa8de6e67c 100644
--- a/test/integration_test.go
+++ b/test/integration_test.go
@@ -359,6 +359,7 @@ func mustMakeOperators(t *testing.T, cst *coremock.ChainDataMock, logger logging
InternalRetrievalPort: op.RetrievalPort,
InternalDispersalPort: op.DispersalPort,
V2DispersalPort: op.V2DispersalPort,
+ V2RetrievalPort: op.V2RetrievalPort,
EnableMetrics: false,
Timeout: 10,
ExpirationPollIntervalSec: 10,
@@ -383,7 +384,7 @@ func mustMakeOperators(t *testing.T, cst *coremock.ChainDataMock, logger logging
tx.On("GetBlockStaleMeasure").Return(nil)
tx.On("GetStoreDurationBlocks").Return(nil)
tx.On("OperatorIDToAddress").Return(gethcommon.Address{1}, nil)
- socket := core.MakeOperatorSocket(config.Hostname, config.DispersalPort, config.RetrievalPort, config.V2DispersalPort)
+ socket := core.MakeOperatorSocket(config.Hostname, config.DispersalPort, config.RetrievalPort, config.V2DispersalPort, config.V2RetrievalPort)
tx.On("GetOperatorSocket", mock.Anything, mock.Anything).Return(socket.String(), nil)
noopMetrics := metrics.NewNoopMetrics()
diff --git a/test/v2/test_client.go b/test/v2/test_client.go
index abceb59afa..4c4cdc1fe6 100644
--- a/test/v2/test_client.go
+++ b/test/v2/test_client.go
@@ -97,7 +97,8 @@ func NewTestClient(t *testing.T, config *TestClientConfig) *TestClient {
privateKeyString = strings.Trim(privateKeyString, "\n \t")
privateKeyString, _ = strings.CutPrefix(privateKeyString, "0x")
- signer := auth.NewLocalBlobRequestSigner(privateKeyString)
+ signer, err := auth.NewLocalBlobRequestSigner(privateKeyString)
+ require.NoError(t, err)
signerAccountId, err := signer.GetAccountID()
require.NoError(t, err)
accountId := gethcommon.HexToAddress(signerAccountId)
@@ -191,7 +192,7 @@ func (c *TestClient) DisperseAndVerify(
if err != nil {
return err
}
- blobCert := c.WaitForCertification(ctx, key)
+ blobCert := c.WaitForCompletion(ctx, key)
// Unpad the payload
unpaddedPayload := codec.RemoveEmptyByteFromPaddedBytes(payload)
@@ -216,8 +217,9 @@ func (c *TestClient) DispersePayload(
return key, err
}
-// WaitForCertification waits for a blob to be certified. Returns the blob certificate.
-func (c *TestClient) WaitForCertification(ctx context.Context, key corev2.BlobKey) *commonv2.BlobCertificate {
+// WaitForCompletion waits for a blob to be complete. Returns the blob certificate.
+// TODO: When GATHERING_SIGNATURES is added, this function should be updated to validate it first moves to GATHERING_SIGNATURES before moving to COMPLETE status.
+func (c *TestClient) WaitForCompletion(ctx context.Context, key corev2.BlobKey) *commonv2.BlobCertificate {
var status *v2.BlobStatus = nil
ticker := time.NewTicker(time.Second)
start := time.Now()
@@ -228,11 +230,11 @@ func (c *TestClient) WaitForCertification(ctx context.Context, key corev2.BlobKe
reply, err := c.DisperserClient.GetBlobStatus(ctx, key)
require.NoError(c.t, err)
- if reply.Status == v2.BlobStatus_CERTIFIED {
+ if reply.Status == v2.BlobStatus_COMPLETE {
elapsed := time.Since(statusStart)
totalElapsed := time.Since(start)
fmt.Printf(
- "Blob is certified (spent %0.1fs in prior status, total time %0.1fs)\n",
+ "Blob is complete (spent %0.1fs in prior status, total time %0.1fs)\n",
elapsed.Seconds(),
totalElapsed.Seconds())
@@ -254,8 +256,7 @@ func (c *TestClient) WaitForCertification(ctx context.Context, key corev2.BlobKe
status = &reply.Status
if reply.Status == v2.BlobStatus_FAILED ||
- reply.Status == v2.BlobStatus_UNKNOWN ||
- reply.Status == v2.BlobStatus_INSUFFICIENT_SIGNATURES {
+ reply.Status == v2.BlobStatus_UNKNOWN {
require.Fail(
c.t,
"Blob status is in a terminal non-successful state.",
diff --git a/tools/traffic/generator_v2.go b/tools/traffic/generator_v2.go
index 5772a3502d..330af46def 100644
--- a/tools/traffic/generator_v2.go
+++ b/tools/traffic/generator_v2.go
@@ -57,12 +57,9 @@ func NewTrafficGeneratorV2(config *config.Config) (*Generator, error) {
return nil, err
}
- var signer *auth.LocalBlobRequestSigner
- if config.SignerPrivateKey != "" {
- signer = auth.NewLocalBlobRequestSigner(config.SignerPrivateKey)
- } else {
- logger.Error("signer private key is required")
- return nil, fmt.Errorf("signer private key is required")
+ signer, err := auth.NewLocalBlobRequestSigner(config.SignerPrivateKey)
+ if err != nil {
+ return nil, fmt.Errorf("new local blob request signer: %w", err)
}
signerAccountId, err := signer.GetAccountID()