Skip to content

Commit

Permalink
[v2] BlobMetadataStore: Blob Certificate Operations (#844)
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim authored Oct 30, 2024
1 parent 43d8c54 commit 2ed86a0
Show file tree
Hide file tree
Showing 8 changed files with 210 additions and 50 deletions.
22 changes: 18 additions & 4 deletions common/aws/dynamodb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ const (
)

var (
once sync.Once
clientRef *Client
once sync.Once
clientRef *Client
ErrConditionFailed = errors.New("condition failed")
)

type Item = map[string]types.AttributeValue
Expand Down Expand Up @@ -108,11 +109,24 @@ func (c *Client) PutItem(ctx context.Context, tableName string, item Item) (err
return nil
}

func (c *Client) PutItemWithCondition(ctx context.Context, tableName string, item Item, condition string) (err error) {
func (c *Client) PutItemWithCondition(
ctx context.Context,
tableName string,
item Item,
condition string,
expressionAttributeNames map[string]string,
expressionAttributeValues map[string]types.AttributeValue,
) (err error) {
_, err = c.dynamoClient.PutItem(ctx, &dynamodb.PutItemInput{
TableName: aws.String(tableName), Item: item,
ConditionExpression: aws.String(condition),
ConditionExpression: aws.String(condition),
ExpressionAttributeNames: expressionAttributeNames,
ExpressionAttributeValues: expressionAttributeValues,
})
var ccfe *types.ConditionalCheckFailedException
if errors.As(err, &ccfe) {
return ErrConditionFailed
}
if err != nil {
return fmt.Errorf("failed to put item in table %s: %w", tableName, err)
}
Expand Down
78 changes: 45 additions & 33 deletions common/aws/dynamodb/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,43 +140,42 @@ func TestBasicOperations(t *testing.T) {
createTable(t, tableName)

ctx := context.Background()
err := dynamoClient.PutItem(ctx, tableName,
commondynamodb.Item{
"MetadataKey": &types.AttributeValueMemberS{Value: "key"},
"RequestedAt": &types.AttributeValueMemberN{Value: "123"},
"SecurityParams": &types.AttributeValueMemberL{
Value: []types.AttributeValue{
&types.AttributeValueMemberM{
Value: map[string]types.AttributeValue{
"QuorumID": &types.AttributeValueMemberN{Value: "0"},
"AdversaryThreshold": &types.AttributeValueMemberN{Value: "80"},
},
item := commondynamodb.Item{
"MetadataKey": &types.AttributeValueMemberS{Value: "key"},
"RequestedAt": &types.AttributeValueMemberN{Value: "123"},
"SecurityParams": &types.AttributeValueMemberL{
Value: []types.AttributeValue{
&types.AttributeValueMemberM{
Value: map[string]types.AttributeValue{
"QuorumID": &types.AttributeValueMemberN{Value: "0"},
"AdversaryThreshold": &types.AttributeValueMemberN{Value: "80"},
},
&types.AttributeValueMemberM{
Value: map[string]types.AttributeValue{
"QuorumID": &types.AttributeValueMemberN{Value: "1"},
"AdversaryThreshold": &types.AttributeValueMemberN{Value: "70"},
},
},
&types.AttributeValueMemberM{
Value: map[string]types.AttributeValue{
"QuorumID": &types.AttributeValueMemberN{Value: "1"},
"AdversaryThreshold": &types.AttributeValueMemberN{Value: "70"},
},
},
},
"BlobSize": &types.AttributeValueMemberN{Value: "123"},
"BlobKey": &types.AttributeValueMemberS{Value: "blob1"},
"Status": &types.AttributeValueMemberS{Value: "Processing"},
},
)
"BlobSize": &types.AttributeValueMemberN{Value: "123"},
"BlobKey": &types.AttributeValueMemberS{Value: "blob1"},
"Status": &types.AttributeValueMemberS{Value: "Processing"},
}
err := dynamoClient.PutItem(ctx, tableName, item)
assert.NoError(t, err)

item, err := dynamoClient.GetItem(ctx, tableName, commondynamodb.Key{
fetchedItem, err := dynamoClient.GetItem(ctx, tableName, commondynamodb.Key{
"MetadataKey": &types.AttributeValueMemberS{Value: "key"},
})
assert.NoError(t, err)

assert.Equal(t, "key", item["MetadataKey"].(*types.AttributeValueMemberS).Value)
assert.Equal(t, "123", item["RequestedAt"].(*types.AttributeValueMemberN).Value)
assert.Equal(t, "Processing", item["Status"].(*types.AttributeValueMemberS).Value)
assert.Equal(t, "blob1", item["BlobKey"].(*types.AttributeValueMemberS).Value)
assert.Equal(t, "123", item["BlobSize"].(*types.AttributeValueMemberN).Value)
assert.Equal(t, "key", fetchedItem["MetadataKey"].(*types.AttributeValueMemberS).Value)
assert.Equal(t, "123", fetchedItem["RequestedAt"].(*types.AttributeValueMemberN).Value)
assert.Equal(t, "Processing", fetchedItem["Status"].(*types.AttributeValueMemberS).Value)
assert.Equal(t, "blob1", fetchedItem["BlobKey"].(*types.AttributeValueMemberS).Value)
assert.Equal(t, "123", fetchedItem["BlobSize"].(*types.AttributeValueMemberN).Value)
assert.Equal(t, []types.AttributeValue{
&types.AttributeValueMemberM{
Value: map[string]types.AttributeValue{
Expand All @@ -190,7 +189,20 @@ func TestBasicOperations(t *testing.T) {
"AdversaryThreshold": &types.AttributeValueMemberN{Value: "70"},
},
},
}, item["SecurityParams"].(*types.AttributeValueMemberL).Value)
}, fetchedItem["SecurityParams"].(*types.AttributeValueMemberL).Value)

// Attempt to put an item with the same key
err = dynamoClient.PutItemWithCondition(ctx, tableName, commondynamodb.Item{
"MetadataKey": &types.AttributeValueMemberS{Value: "key"},
"RequestedAt": &types.AttributeValueMemberN{Value: "456"},
}, "attribute_not_exists(MetadataKey)", nil, nil)
assert.ErrorIs(t, err, commondynamodb.ErrConditionFailed)
fetchedItem, err = dynamoClient.GetItem(ctx, tableName, commondynamodb.Key{
"MetadataKey": &types.AttributeValueMemberS{Value: "key"},
})
assert.NoError(t, err)
// Shouldn't have been updated
assert.Equal(t, "123", fetchedItem["RequestedAt"].(*types.AttributeValueMemberN).Value)

_, err = dynamoClient.UpdateItem(ctx, tableName, commondynamodb.Key{
"MetadataKey": &types.AttributeValueMemberS{Value: "key"},
Expand All @@ -210,15 +222,15 @@ func TestBasicOperations(t *testing.T) {
}, "BlobSize", 1000)
assert.NoError(t, err)

item, err = dynamoClient.GetItem(ctx, tableName, commondynamodb.Key{
fetchedItem, err = dynamoClient.GetItem(ctx, tableName, commondynamodb.Key{
"MetadataKey": &types.AttributeValueMemberS{Value: "key"},
})
assert.NoError(t, err)
assert.Equal(t, "key", item["MetadataKey"].(*types.AttributeValueMemberS).Value)
assert.Equal(t, "Confirmed", item["Status"].(*types.AttributeValueMemberS).Value)
assert.Equal(t, "0x123", item["BatchHeaderHash"].(*types.AttributeValueMemberS).Value)
assert.Equal(t, "0", item["BlobIndex"].(*types.AttributeValueMemberN).Value)
assert.Equal(t, "1123", item["BlobSize"].(*types.AttributeValueMemberN).Value)
assert.Equal(t, "key", fetchedItem["MetadataKey"].(*types.AttributeValueMemberS).Value)
assert.Equal(t, "Confirmed", fetchedItem["Status"].(*types.AttributeValueMemberS).Value)
assert.Equal(t, "0x123", fetchedItem["BatchHeaderHash"].(*types.AttributeValueMemberS).Value)
assert.Equal(t, "0", fetchedItem["BlobIndex"].(*types.AttributeValueMemberN).Value)
assert.Equal(t, "1123", fetchedItem["BlobSize"].(*types.AttributeValueMemberN).Value)

err = dynamoClient.DeleteTable(ctx, tableName)
assert.NoError(t, err)
Expand Down
14 changes: 7 additions & 7 deletions core/v2/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func makeTestBlob(t *testing.T, p encoding.Prover, version corev2.BlobVersion, r
}

header := corev2.BlobCertificate{
BlobHeader: corev2.BlobHeader{
BlobHeader: &corev2.BlobHeader{
BlobVersion: version,
QuorumNumbers: quorums,
BlobCommitments: commitments,
Expand All @@ -114,7 +114,7 @@ func makeTestBlob(t *testing.T, p encoding.Prover, version corev2.BlobVersion, r

// prepareBlobs takes in multiple blob, encodes them, generates the associated assignments, and the batch header.
// These are the products that a disperser will need in order to disperse data to the DA nodes.
func prepareBlobs(t *testing.T, operatorCount uint, headers []corev2.BlobCertificate, blobs [][]byte) (map[core.OperatorID][]*corev2.BlobShard, core.IndexedChainState) {
func prepareBlobs(t *testing.T, operatorCount uint, certs []corev2.BlobCertificate, blobs [][]byte) (map[core.OperatorID][]*corev2.BlobShard, core.IndexedChainState) {

cst, err := mock.MakeChainDataMock(map[uint8]int{
0: int(operatorCount),
Expand All @@ -123,11 +123,11 @@ func prepareBlobs(t *testing.T, operatorCount uint, headers []corev2.BlobCertifi
})
assert.NoError(t, err)

blobsMap := make([]map[core.QuorumID]map[core.OperatorID][]*encoding.Frame, 0, len(headers))

for z, header := range headers {
blobsMap := make([]map[core.QuorumID]map[core.OperatorID][]*encoding.Frame, 0, len(certs))

for z, cert := range certs {
blob := blobs[z]
header := cert.BlobHeader

params, err := header.GetEncodingParams()
if err != nil {
Expand All @@ -139,7 +139,7 @@ func prepareBlobs(t *testing.T, operatorCount uint, headers []corev2.BlobCertifi
t.Fatal(err)
}

state, err := cst.GetOperatorState(context.Background(), uint(header.ReferenceBlockNumber), header.QuorumNumbers)
state, err := cst.GetOperatorState(context.Background(), uint(cert.ReferenceBlockNumber), header.QuorumNumbers)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -177,7 +177,7 @@ func prepareBlobs(t *testing.T, operatorCount uint, headers []corev2.BlobCertifi
}
if len(inverseMap[operatorID]) < blobIndex+1 {
inverseMap[operatorID] = append(inverseMap[operatorID], &corev2.BlobShard{
BlobCertificate: headers[blobIndex],
BlobCertificate: certs[blobIndex],
Chunks: make(map[core.QuorumID][]*encoding.Frame),
})
}
Expand Down
6 changes: 4 additions & 2 deletions core/v2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,14 +230,16 @@ func (b *BlobHeader) BlobKey() (BlobKey, error) {
return headerHash, nil
}

type RelayKey uint16

type BlobCertificate struct {
BlobHeader
BlobHeader *BlobHeader

// ReferenceBlockNumber is the block number of the block at which the operator state will be referenced
ReferenceBlockNumber uint64

// RelayKeys
RelayKeys []uint16
RelayKeys []RelayKey
}

type BlobVersionParameters struct {
Expand Down
6 changes: 3 additions & 3 deletions core/v2/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (v *ShardValidator) validateBlobQuorum(quorum core.QuorumID, blob *BlobShar
}

// Get the assignments for the quorum
assignment, err := GetAssignment(operatorState, blob.BlobVersion, quorum, v.operatorID)
assignment, err := GetAssignment(operatorState, blob.BlobHeader.BlobVersion, quorum, v.operatorID)
if err != nil {
return nil, nil, err
}
Expand All @@ -57,7 +57,7 @@ func (v *ShardValidator) validateBlobQuorum(quorum core.QuorumID, blob *BlobShar
}

// Validate the chunkLength against the confirmation and adversary threshold parameters
chunkLength, err := GetChunkLength(blob.BlobVersion, uint32(blob.BlobHeader.BlobCommitments.Length))
chunkLength, err := GetChunkLength(blob.BlobHeader.BlobVersion, uint32(blob.BlobHeader.BlobCommitments.Length))
if err != nil {
return nil, nil, fmt.Errorf("invalid chunk length: %w", err)
}
Expand Down Expand Up @@ -98,7 +98,7 @@ func (v *ShardValidator) ValidateBlobs(ctx context.Context, blobs []*BlobShard,
return err
}
// TODO: Define params for the blob
params, err := blob.GetEncodingParams()
params, err := blob.BlobHeader.GetEncodingParams()
if err != nil {
return err
}
Expand Down
75 changes: 74 additions & 1 deletion disperser/common/v2/blobstore/dynamo_metadata_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package blobstore

import (
"context"
"errors"
"fmt"
"strconv"
"strings"
Expand All @@ -25,6 +26,7 @@ const (

blobKeyPrefix = "BlobKey#"
blobMetadataSK = "BlobMetadata"
blobCertSK = "BlobCertificate"
)

// BlobMetadataStore is a blob metadata storage backed by DynamoDB
Expand All @@ -49,7 +51,12 @@ func (s *BlobMetadataStore) PutBlobMetadata(ctx context.Context, blobMetadata *v
return err
}

return s.dynamoDBClient.PutItem(ctx, s.tableName, item)
err = s.dynamoDBClient.PutItemWithCondition(ctx, s.tableName, item, "attribute_not_exists(PK) AND attribute_not_exists(SK)", nil, nil)
if errors.Is(err, commondynamodb.ErrConditionFailed) {
return disperser.ErrAlreadyExists
}

return err
}

func (s *BlobMetadataStore) GetBlobMetadata(ctx context.Context, blobKey core.BlobKey) (*v2.BlobMetadata, error) {
Expand Down Expand Up @@ -118,6 +125,46 @@ func (s *BlobMetadataStore) GetBlobMetadataCountByStatus(ctx context.Context, st
return count, nil
}

func (s *BlobMetadataStore) PutBlobCertificate(ctx context.Context, blobCert *core.BlobCertificate) error {
item, err := MarshalBlobCertificate(blobCert)
if err != nil {
return err
}

err = s.dynamoDBClient.PutItemWithCondition(ctx, s.tableName, item, "attribute_not_exists(PK) AND attribute_not_exists(SK)", nil, nil)
if errors.Is(err, commondynamodb.ErrConditionFailed) {
return disperser.ErrAlreadyExists
}

return err
}

func (s *BlobMetadataStore) GetBlobCertificate(ctx context.Context, blobKey core.BlobKey) (*core.BlobCertificate, error) {
item, err := s.dynamoDBClient.GetItem(ctx, s.tableName, map[string]types.AttributeValue{
"PK": &types.AttributeValueMemberS{
Value: blobKeyPrefix + blobKey.Hex(),
},
"SK": &types.AttributeValueMemberS{
Value: blobCertSK,
},
})

if err != nil {
return nil, err
}

if item == nil {
return nil, fmt.Errorf("%w: certificate not found for key %s", disperser.ErrMetadataNotFound, blobKey.Hex())
}

cert, err := UnmarshalBlobCertificate(item)
if err != nil {
return nil, err
}

return cert, nil
}

func GenerateTableSchema(tableName string, readCapacityUnits int64, writeCapacityUnits int64) *dynamodb.CreateTableInput {
return &dynamodb.CreateTableInput{
AttributeDefinitions: []types.AttributeDefinition{
Expand Down Expand Up @@ -272,3 +319,29 @@ func UnmarshalBlobMetadata(item commondynamodb.Item) (*v2.BlobMetadata, error) {
}
return &metadata, nil
}

func MarshalBlobCertificate(blobCert *core.BlobCertificate) (commondynamodb.Item, error) {
fields, err := attributevalue.MarshalMap(blobCert)
if err != nil {
return nil, fmt.Errorf("failed to marshal blob certificate: %w", err)
}

// Add PK and SK fields
blobKey, err := blobCert.BlobHeader.BlobKey()
if err != nil {
return nil, err
}
fields["PK"] = &types.AttributeValueMemberS{Value: blobKeyPrefix + blobKey.Hex()}
fields["SK"] = &types.AttributeValueMemberS{Value: blobCertSK}

return fields, nil
}

func UnmarshalBlobCertificate(item commondynamodb.Item) (*core.BlobCertificate, error) {
cert := core.BlobCertificate{}
err := attributevalue.UnmarshalMap(item, &cert)
if err != nil {
return nil, err
}
return &cert, nil
}
Loading

0 comments on commit 2ed86a0

Please sign in to comment.