diff --git a/common/aws/dynamodb/client.go b/common/aws/dynamodb/client.go index bf9000b392..e50c4afbf0 100644 --- a/common/aws/dynamodb/client.go +++ b/common/aws/dynamodb/client.go @@ -33,8 +33,9 @@ const ( ) var ( - once sync.Once - clientRef *Client + once sync.Once + clientRef *Client + ErrConditionFailed = errors.New("condition failed") ) type Item = map[string]types.AttributeValue @@ -108,11 +109,24 @@ func (c *Client) PutItem(ctx context.Context, tableName string, item Item) (err return nil } -func (c *Client) PutItemWithCondition(ctx context.Context, tableName string, item Item, condition string) (err error) { +func (c *Client) PutItemWithCondition( + ctx context.Context, + tableName string, + item Item, + condition string, + expressionAttributeNames map[string]string, + expressionAttributeValues map[string]types.AttributeValue, +) (err error) { _, err = c.dynamoClient.PutItem(ctx, &dynamodb.PutItemInput{ TableName: aws.String(tableName), Item: item, - ConditionExpression: aws.String(condition), + ConditionExpression: aws.String(condition), + ExpressionAttributeNames: expressionAttributeNames, + ExpressionAttributeValues: expressionAttributeValues, }) + var ccfe *types.ConditionalCheckFailedException + if errors.As(err, &ccfe) { + return ErrConditionFailed + } if err != nil { return fmt.Errorf("failed to put item in table %s: %w", tableName, err) } diff --git a/common/aws/dynamodb/client_test.go b/common/aws/dynamodb/client_test.go index 1b7a2bdf9e..4089fa9880 100644 --- a/common/aws/dynamodb/client_test.go +++ b/common/aws/dynamodb/client_test.go @@ -140,43 +140,42 @@ func TestBasicOperations(t *testing.T) { createTable(t, tableName) ctx := context.Background() - err := dynamoClient.PutItem(ctx, tableName, - commondynamodb.Item{ - "MetadataKey": &types.AttributeValueMemberS{Value: "key"}, - "RequestedAt": &types.AttributeValueMemberN{Value: "123"}, - "SecurityParams": &types.AttributeValueMemberL{ - Value: []types.AttributeValue{ - &types.AttributeValueMemberM{ - Value: map[string]types.AttributeValue{ - "QuorumID": &types.AttributeValueMemberN{Value: "0"}, - "AdversaryThreshold": &types.AttributeValueMemberN{Value: "80"}, - }, + item := commondynamodb.Item{ + "MetadataKey": &types.AttributeValueMemberS{Value: "key"}, + "RequestedAt": &types.AttributeValueMemberN{Value: "123"}, + "SecurityParams": &types.AttributeValueMemberL{ + Value: []types.AttributeValue{ + &types.AttributeValueMemberM{ + Value: map[string]types.AttributeValue{ + "QuorumID": &types.AttributeValueMemberN{Value: "0"}, + "AdversaryThreshold": &types.AttributeValueMemberN{Value: "80"}, }, - &types.AttributeValueMemberM{ - Value: map[string]types.AttributeValue{ - "QuorumID": &types.AttributeValueMemberN{Value: "1"}, - "AdversaryThreshold": &types.AttributeValueMemberN{Value: "70"}, - }, + }, + &types.AttributeValueMemberM{ + Value: map[string]types.AttributeValue{ + "QuorumID": &types.AttributeValueMemberN{Value: "1"}, + "AdversaryThreshold": &types.AttributeValueMemberN{Value: "70"}, }, }, }, - "BlobSize": &types.AttributeValueMemberN{Value: "123"}, - "BlobKey": &types.AttributeValueMemberS{Value: "blob1"}, - "Status": &types.AttributeValueMemberS{Value: "Processing"}, }, - ) + "BlobSize": &types.AttributeValueMemberN{Value: "123"}, + "BlobKey": &types.AttributeValueMemberS{Value: "blob1"}, + "Status": &types.AttributeValueMemberS{Value: "Processing"}, + } + err := dynamoClient.PutItem(ctx, tableName, item) assert.NoError(t, err) - item, err := dynamoClient.GetItem(ctx, tableName, commondynamodb.Key{ + fetchedItem, err := dynamoClient.GetItem(ctx, tableName, commondynamodb.Key{ "MetadataKey": &types.AttributeValueMemberS{Value: "key"}, }) assert.NoError(t, err) - assert.Equal(t, "key", item["MetadataKey"].(*types.AttributeValueMemberS).Value) - assert.Equal(t, "123", item["RequestedAt"].(*types.AttributeValueMemberN).Value) - assert.Equal(t, "Processing", item["Status"].(*types.AttributeValueMemberS).Value) - assert.Equal(t, "blob1", item["BlobKey"].(*types.AttributeValueMemberS).Value) - assert.Equal(t, "123", item["BlobSize"].(*types.AttributeValueMemberN).Value) + assert.Equal(t, "key", fetchedItem["MetadataKey"].(*types.AttributeValueMemberS).Value) + assert.Equal(t, "123", fetchedItem["RequestedAt"].(*types.AttributeValueMemberN).Value) + assert.Equal(t, "Processing", fetchedItem["Status"].(*types.AttributeValueMemberS).Value) + assert.Equal(t, "blob1", fetchedItem["BlobKey"].(*types.AttributeValueMemberS).Value) + assert.Equal(t, "123", fetchedItem["BlobSize"].(*types.AttributeValueMemberN).Value) assert.Equal(t, []types.AttributeValue{ &types.AttributeValueMemberM{ Value: map[string]types.AttributeValue{ @@ -190,7 +189,20 @@ func TestBasicOperations(t *testing.T) { "AdversaryThreshold": &types.AttributeValueMemberN{Value: "70"}, }, }, - }, item["SecurityParams"].(*types.AttributeValueMemberL).Value) + }, fetchedItem["SecurityParams"].(*types.AttributeValueMemberL).Value) + + // Attempt to put an item with the same key + err = dynamoClient.PutItemWithCondition(ctx, tableName, commondynamodb.Item{ + "MetadataKey": &types.AttributeValueMemberS{Value: "key"}, + "RequestedAt": &types.AttributeValueMemberN{Value: "456"}, + }, "attribute_not_exists(MetadataKey)", nil, nil) + assert.ErrorIs(t, err, commondynamodb.ErrConditionFailed) + fetchedItem, err = dynamoClient.GetItem(ctx, tableName, commondynamodb.Key{ + "MetadataKey": &types.AttributeValueMemberS{Value: "key"}, + }) + assert.NoError(t, err) + // Shouldn't have been updated + assert.Equal(t, "123", fetchedItem["RequestedAt"].(*types.AttributeValueMemberN).Value) _, err = dynamoClient.UpdateItem(ctx, tableName, commondynamodb.Key{ "MetadataKey": &types.AttributeValueMemberS{Value: "key"}, @@ -210,15 +222,15 @@ func TestBasicOperations(t *testing.T) { }, "BlobSize", 1000) assert.NoError(t, err) - item, err = dynamoClient.GetItem(ctx, tableName, commondynamodb.Key{ + fetchedItem, err = dynamoClient.GetItem(ctx, tableName, commondynamodb.Key{ "MetadataKey": &types.AttributeValueMemberS{Value: "key"}, }) assert.NoError(t, err) - assert.Equal(t, "key", item["MetadataKey"].(*types.AttributeValueMemberS).Value) - assert.Equal(t, "Confirmed", item["Status"].(*types.AttributeValueMemberS).Value) - assert.Equal(t, "0x123", item["BatchHeaderHash"].(*types.AttributeValueMemberS).Value) - assert.Equal(t, "0", item["BlobIndex"].(*types.AttributeValueMemberN).Value) - assert.Equal(t, "1123", item["BlobSize"].(*types.AttributeValueMemberN).Value) + assert.Equal(t, "key", fetchedItem["MetadataKey"].(*types.AttributeValueMemberS).Value) + assert.Equal(t, "Confirmed", fetchedItem["Status"].(*types.AttributeValueMemberS).Value) + assert.Equal(t, "0x123", fetchedItem["BatchHeaderHash"].(*types.AttributeValueMemberS).Value) + assert.Equal(t, "0", fetchedItem["BlobIndex"].(*types.AttributeValueMemberN).Value) + assert.Equal(t, "1123", fetchedItem["BlobSize"].(*types.AttributeValueMemberN).Value) err = dynamoClient.DeleteTable(ctx, tableName) assert.NoError(t, err) diff --git a/core/v2/core_test.go b/core/v2/core_test.go index 4d893bc3f9..d5b130b0cc 100644 --- a/core/v2/core_test.go +++ b/core/v2/core_test.go @@ -100,7 +100,7 @@ func makeTestBlob(t *testing.T, p encoding.Prover, version corev2.BlobVersion, r } header := corev2.BlobCertificate{ - BlobHeader: corev2.BlobHeader{ + BlobHeader: &corev2.BlobHeader{ BlobVersion: version, QuorumNumbers: quorums, BlobCommitments: commitments, @@ -114,7 +114,7 @@ func makeTestBlob(t *testing.T, p encoding.Prover, version corev2.BlobVersion, r // prepareBlobs takes in multiple blob, encodes them, generates the associated assignments, and the batch header. // These are the products that a disperser will need in order to disperse data to the DA nodes. -func prepareBlobs(t *testing.T, operatorCount uint, headers []corev2.BlobCertificate, blobs [][]byte) (map[core.OperatorID][]*corev2.BlobShard, core.IndexedChainState) { +func prepareBlobs(t *testing.T, operatorCount uint, certs []corev2.BlobCertificate, blobs [][]byte) (map[core.OperatorID][]*corev2.BlobShard, core.IndexedChainState) { cst, err := mock.MakeChainDataMock(map[uint8]int{ 0: int(operatorCount), @@ -123,11 +123,11 @@ func prepareBlobs(t *testing.T, operatorCount uint, headers []corev2.BlobCertifi }) assert.NoError(t, err) - blobsMap := make([]map[core.QuorumID]map[core.OperatorID][]*encoding.Frame, 0, len(headers)) - - for z, header := range headers { + blobsMap := make([]map[core.QuorumID]map[core.OperatorID][]*encoding.Frame, 0, len(certs)) + for z, cert := range certs { blob := blobs[z] + header := cert.BlobHeader params, err := header.GetEncodingParams() if err != nil { @@ -139,7 +139,7 @@ func prepareBlobs(t *testing.T, operatorCount uint, headers []corev2.BlobCertifi t.Fatal(err) } - state, err := cst.GetOperatorState(context.Background(), uint(header.ReferenceBlockNumber), header.QuorumNumbers) + state, err := cst.GetOperatorState(context.Background(), uint(cert.ReferenceBlockNumber), header.QuorumNumbers) if err != nil { t.Fatal(err) } @@ -177,7 +177,7 @@ func prepareBlobs(t *testing.T, operatorCount uint, headers []corev2.BlobCertifi } if len(inverseMap[operatorID]) < blobIndex+1 { inverseMap[operatorID] = append(inverseMap[operatorID], &corev2.BlobShard{ - BlobCertificate: headers[blobIndex], + BlobCertificate: certs[blobIndex], Chunks: make(map[core.QuorumID][]*encoding.Frame), }) } diff --git a/core/v2/types.go b/core/v2/types.go index ac3b234ef4..bc81fad16a 100644 --- a/core/v2/types.go +++ b/core/v2/types.go @@ -230,14 +230,16 @@ func (b *BlobHeader) BlobKey() (BlobKey, error) { return headerHash, nil } +type RelayKey uint16 + type BlobCertificate struct { - BlobHeader + BlobHeader *BlobHeader // ReferenceBlockNumber is the block number of the block at which the operator state will be referenced ReferenceBlockNumber uint64 // RelayKeys - RelayKeys []uint16 + RelayKeys []RelayKey } type BlobVersionParameters struct { diff --git a/core/v2/validator.go b/core/v2/validator.go index db513c3191..9b96f04c57 100644 --- a/core/v2/validator.go +++ b/core/v2/validator.go @@ -43,7 +43,7 @@ func (v *ShardValidator) validateBlobQuorum(quorum core.QuorumID, blob *BlobShar } // Get the assignments for the quorum - assignment, err := GetAssignment(operatorState, blob.BlobVersion, quorum, v.operatorID) + assignment, err := GetAssignment(operatorState, blob.BlobHeader.BlobVersion, quorum, v.operatorID) if err != nil { return nil, nil, err } @@ -57,7 +57,7 @@ func (v *ShardValidator) validateBlobQuorum(quorum core.QuorumID, blob *BlobShar } // Validate the chunkLength against the confirmation and adversary threshold parameters - chunkLength, err := GetChunkLength(blob.BlobVersion, uint32(blob.BlobHeader.BlobCommitments.Length)) + chunkLength, err := GetChunkLength(blob.BlobHeader.BlobVersion, uint32(blob.BlobHeader.BlobCommitments.Length)) if err != nil { return nil, nil, fmt.Errorf("invalid chunk length: %w", err) } @@ -98,7 +98,7 @@ func (v *ShardValidator) ValidateBlobs(ctx context.Context, blobs []*BlobShard, return err } // TODO: Define params for the blob - params, err := blob.GetEncodingParams() + params, err := blob.BlobHeader.GetEncodingParams() if err != nil { return err } diff --git a/disperser/common/v2/blobstore/dynamo_metadata_store.go b/disperser/common/v2/blobstore/dynamo_metadata_store.go index 40424fe9d3..2d745ee715 100644 --- a/disperser/common/v2/blobstore/dynamo_metadata_store.go +++ b/disperser/common/v2/blobstore/dynamo_metadata_store.go @@ -2,6 +2,7 @@ package blobstore import ( "context" + "errors" "fmt" "strconv" "strings" @@ -25,6 +26,7 @@ const ( blobKeyPrefix = "BlobKey#" blobMetadataSK = "BlobMetadata" + blobCertSK = "BlobCertificate" ) // BlobMetadataStore is a blob metadata storage backed by DynamoDB @@ -49,7 +51,12 @@ func (s *BlobMetadataStore) PutBlobMetadata(ctx context.Context, blobMetadata *v return err } - return s.dynamoDBClient.PutItem(ctx, s.tableName, item) + err = s.dynamoDBClient.PutItemWithCondition(ctx, s.tableName, item, "attribute_not_exists(PK) AND attribute_not_exists(SK)", nil, nil) + if errors.Is(err, commondynamodb.ErrConditionFailed) { + return disperser.ErrAlreadyExists + } + + return err } func (s *BlobMetadataStore) GetBlobMetadata(ctx context.Context, blobKey core.BlobKey) (*v2.BlobMetadata, error) { @@ -118,6 +125,46 @@ func (s *BlobMetadataStore) GetBlobMetadataCountByStatus(ctx context.Context, st return count, nil } +func (s *BlobMetadataStore) PutBlobCertificate(ctx context.Context, blobCert *core.BlobCertificate) error { + item, err := MarshalBlobCertificate(blobCert) + if err != nil { + return err + } + + err = s.dynamoDBClient.PutItemWithCondition(ctx, s.tableName, item, "attribute_not_exists(PK) AND attribute_not_exists(SK)", nil, nil) + if errors.Is(err, commondynamodb.ErrConditionFailed) { + return disperser.ErrAlreadyExists + } + + return err +} + +func (s *BlobMetadataStore) GetBlobCertificate(ctx context.Context, blobKey core.BlobKey) (*core.BlobCertificate, error) { + item, err := s.dynamoDBClient.GetItem(ctx, s.tableName, map[string]types.AttributeValue{ + "PK": &types.AttributeValueMemberS{ + Value: blobKeyPrefix + blobKey.Hex(), + }, + "SK": &types.AttributeValueMemberS{ + Value: blobCertSK, + }, + }) + + if err != nil { + return nil, err + } + + if item == nil { + return nil, fmt.Errorf("%w: certificate not found for key %s", disperser.ErrMetadataNotFound, blobKey.Hex()) + } + + cert, err := UnmarshalBlobCertificate(item) + if err != nil { + return nil, err + } + + return cert, nil +} + func GenerateTableSchema(tableName string, readCapacityUnits int64, writeCapacityUnits int64) *dynamodb.CreateTableInput { return &dynamodb.CreateTableInput{ AttributeDefinitions: []types.AttributeDefinition{ @@ -272,3 +319,29 @@ func UnmarshalBlobMetadata(item commondynamodb.Item) (*v2.BlobMetadata, error) { } return &metadata, nil } + +func MarshalBlobCertificate(blobCert *core.BlobCertificate) (commondynamodb.Item, error) { + fields, err := attributevalue.MarshalMap(blobCert) + if err != nil { + return nil, fmt.Errorf("failed to marshal blob certificate: %w", err) + } + + // Add PK and SK fields + blobKey, err := blobCert.BlobHeader.BlobKey() + if err != nil { + return nil, err + } + fields["PK"] = &types.AttributeValueMemberS{Value: blobKeyPrefix + blobKey.Hex()} + fields["SK"] = &types.AttributeValueMemberS{Value: blobCertSK} + + return fields, nil +} + +func UnmarshalBlobCertificate(item commondynamodb.Item) (*core.BlobCertificate, error) { + cert := core.BlobCertificate{} + err := attributevalue.UnmarshalMap(item, &cert) + if err != nil { + return nil, err + } + return &cert, nil +} diff --git a/disperser/common/v2/blobstore/dynamo_metadata_store_test.go b/disperser/common/v2/blobstore/dynamo_metadata_store_test.go index b87a2b8090..04abf3451b 100644 --- a/disperser/common/v2/blobstore/dynamo_metadata_store_test.go +++ b/disperser/common/v2/blobstore/dynamo_metadata_store_test.go @@ -9,6 +9,7 @@ import ( commondynamodb "github.com/Layr-Labs/eigenda/common/aws/dynamodb" core "github.com/Layr-Labs/eigenda/core" corev2 "github.com/Layr-Labs/eigenda/core/v2" + "github.com/Layr-Labs/eigenda/disperser" v2 "github.com/Layr-Labs/eigenda/disperser/common/v2" "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" "github.com/stretchr/testify/assert" @@ -81,6 +82,10 @@ func TestBlobMetadataStoreOperations(t *testing.T) { assert.NoError(t, err) assert.Equal(t, int32(1), queuedCount) + // attempt to put metadata with the same key should fail + err = blobMetadataStore.PutBlobMetadata(ctx, metadata1) + assert.ErrorIs(t, err, disperser.ErrAlreadyExists) + deleteItems(t, []commondynamodb.Key{ { "PK": &types.AttributeValueMemberS{Value: "BlobKey#" + blobKey1.Hex()}, @@ -93,6 +98,59 @@ func TestBlobMetadataStoreOperations(t *testing.T) { }) } +func TestBlobMetadataStoreCerts(t *testing.T) { + ctx := context.Background() + blobCert := &corev2.BlobCertificate{ + BlobHeader: &corev2.BlobHeader{ + BlobVersion: 0, + QuorumNumbers: []core.QuorumID{0}, + BlobCommitments: mockCommitment, + PaymentMetadata: core.PaymentMetadata{ + AccountID: "0x123", + BinIndex: 0, + CumulativePayment: big.NewInt(532), + }, + Signature: []byte("signature"), + }, + ReferenceBlockNumber: uint64(100), + RelayKeys: []corev2.RelayKey{0, 2, 4}, + } + err := blobMetadataStore.PutBlobCertificate(ctx, blobCert) + assert.NoError(t, err) + + blobKey, err := blobCert.BlobHeader.BlobKey() + assert.NoError(t, err) + fetchedCert, err := blobMetadataStore.GetBlobCertificate(ctx, blobKey) + assert.NoError(t, err) + assert.Equal(t, blobCert, fetchedCert) + + // blob cert with the same key should fail + blobCert1 := &corev2.BlobCertificate{ + BlobHeader: &corev2.BlobHeader{ + BlobVersion: 0, + QuorumNumbers: []core.QuorumID{0}, + BlobCommitments: mockCommitment, + PaymentMetadata: core.PaymentMetadata{ + AccountID: "0x123", + BinIndex: 0, + CumulativePayment: big.NewInt(532), + }, + Signature: []byte("signature"), + }, + ReferenceBlockNumber: uint64(1234), + RelayKeys: []corev2.RelayKey{0}, + } + err = blobMetadataStore.PutBlobCertificate(ctx, blobCert1) + assert.ErrorIs(t, err, disperser.ErrAlreadyExists) + + deleteItems(t, []commondynamodb.Key{ + { + "PK": &types.AttributeValueMemberS{Value: "BlobKey#" + blobKey.Hex()}, + "SK": &types.AttributeValueMemberS{Value: "BlobCertificate"}, + }, + }) +} + func deleteItems(t *testing.T, keys []commondynamodb.Key) { failed, err := dynamoClient.DeleteItems(context.Background(), metadataTableName, keys) assert.NoError(t, err) diff --git a/disperser/errors.go b/disperser/errors.go index 2872dbd100..06f5a43c03 100644 --- a/disperser/errors.go +++ b/disperser/errors.go @@ -5,4 +5,5 @@ import "errors" var ( ErrBlobNotFound = errors.New("blob not found") ErrMetadataNotFound = errors.New("metadata not found") + ErrAlreadyExists = errors.New("record already exists") )