Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v2] BlobMetadataStore: Blob Certificate Operations #844

Merged
merged 1 commit into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions common/aws/dynamodb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ const (
)

var (
once sync.Once
clientRef *Client
once sync.Once
clientRef *Client
ErrConditionFailed = errors.New("condition failed")
)

type Item = map[string]types.AttributeValue
Expand Down Expand Up @@ -108,11 +109,24 @@ func (c *Client) PutItem(ctx context.Context, tableName string, item Item) (err
return nil
}

func (c *Client) PutItemWithCondition(ctx context.Context, tableName string, item Item, condition string) (err error) {
func (c *Client) PutItemWithCondition(
ctx context.Context,
tableName string,
item Item,
condition string,
expressionAttributeNames map[string]string,
expressionAttributeValues map[string]types.AttributeValue,
) (err error) {
_, err = c.dynamoClient.PutItem(ctx, &dynamodb.PutItemInput{
TableName: aws.String(tableName), Item: item,
ConditionExpression: aws.String(condition),
ConditionExpression: aws.String(condition),
ExpressionAttributeNames: expressionAttributeNames,
ExpressionAttributeValues: expressionAttributeValues,
})
var ccfe *types.ConditionalCheckFailedException
if errors.As(err, &ccfe) {
return ErrConditionFailed
}
if err != nil {
return fmt.Errorf("failed to put item in table %s: %w", tableName, err)
}
Expand Down
78 changes: 45 additions & 33 deletions common/aws/dynamodb/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,43 +140,42 @@ func TestBasicOperations(t *testing.T) {
createTable(t, tableName)

ctx := context.Background()
err := dynamoClient.PutItem(ctx, tableName,
commondynamodb.Item{
"MetadataKey": &types.AttributeValueMemberS{Value: "key"},
"RequestedAt": &types.AttributeValueMemberN{Value: "123"},
"SecurityParams": &types.AttributeValueMemberL{
Value: []types.AttributeValue{
&types.AttributeValueMemberM{
Value: map[string]types.AttributeValue{
"QuorumID": &types.AttributeValueMemberN{Value: "0"},
"AdversaryThreshold": &types.AttributeValueMemberN{Value: "80"},
},
item := commondynamodb.Item{
"MetadataKey": &types.AttributeValueMemberS{Value: "key"},
"RequestedAt": &types.AttributeValueMemberN{Value: "123"},
"SecurityParams": &types.AttributeValueMemberL{
Value: []types.AttributeValue{
&types.AttributeValueMemberM{
Value: map[string]types.AttributeValue{
"QuorumID": &types.AttributeValueMemberN{Value: "0"},
"AdversaryThreshold": &types.AttributeValueMemberN{Value: "80"},
},
&types.AttributeValueMemberM{
Value: map[string]types.AttributeValue{
"QuorumID": &types.AttributeValueMemberN{Value: "1"},
"AdversaryThreshold": &types.AttributeValueMemberN{Value: "70"},
},
},
&types.AttributeValueMemberM{
Value: map[string]types.AttributeValue{
"QuorumID": &types.AttributeValueMemberN{Value: "1"},
"AdversaryThreshold": &types.AttributeValueMemberN{Value: "70"},
},
},
},
"BlobSize": &types.AttributeValueMemberN{Value: "123"},
"BlobKey": &types.AttributeValueMemberS{Value: "blob1"},
"Status": &types.AttributeValueMemberS{Value: "Processing"},
},
)
"BlobSize": &types.AttributeValueMemberN{Value: "123"},
"BlobKey": &types.AttributeValueMemberS{Value: "blob1"},
"Status": &types.AttributeValueMemberS{Value: "Processing"},
}
err := dynamoClient.PutItem(ctx, tableName, item)
assert.NoError(t, err)

item, err := dynamoClient.GetItem(ctx, tableName, commondynamodb.Key{
fetchedItem, err := dynamoClient.GetItem(ctx, tableName, commondynamodb.Key{
"MetadataKey": &types.AttributeValueMemberS{Value: "key"},
})
assert.NoError(t, err)

assert.Equal(t, "key", item["MetadataKey"].(*types.AttributeValueMemberS).Value)
assert.Equal(t, "123", item["RequestedAt"].(*types.AttributeValueMemberN).Value)
assert.Equal(t, "Processing", item["Status"].(*types.AttributeValueMemberS).Value)
assert.Equal(t, "blob1", item["BlobKey"].(*types.AttributeValueMemberS).Value)
assert.Equal(t, "123", item["BlobSize"].(*types.AttributeValueMemberN).Value)
assert.Equal(t, "key", fetchedItem["MetadataKey"].(*types.AttributeValueMemberS).Value)
assert.Equal(t, "123", fetchedItem["RequestedAt"].(*types.AttributeValueMemberN).Value)
assert.Equal(t, "Processing", fetchedItem["Status"].(*types.AttributeValueMemberS).Value)
assert.Equal(t, "blob1", fetchedItem["BlobKey"].(*types.AttributeValueMemberS).Value)
assert.Equal(t, "123", fetchedItem["BlobSize"].(*types.AttributeValueMemberN).Value)
assert.Equal(t, []types.AttributeValue{
&types.AttributeValueMemberM{
Value: map[string]types.AttributeValue{
Expand All @@ -190,7 +189,20 @@ func TestBasicOperations(t *testing.T) {
"AdversaryThreshold": &types.AttributeValueMemberN{Value: "70"},
},
},
}, item["SecurityParams"].(*types.AttributeValueMemberL).Value)
}, fetchedItem["SecurityParams"].(*types.AttributeValueMemberL).Value)

// Attempt to put an item with the same key
err = dynamoClient.PutItemWithCondition(ctx, tableName, commondynamodb.Item{
"MetadataKey": &types.AttributeValueMemberS{Value: "key"},
"RequestedAt": &types.AttributeValueMemberN{Value: "456"},
}, "attribute_not_exists(MetadataKey)", nil, nil)
assert.ErrorIs(t, err, commondynamodb.ErrConditionFailed)
fetchedItem, err = dynamoClient.GetItem(ctx, tableName, commondynamodb.Key{
"MetadataKey": &types.AttributeValueMemberS{Value: "key"},
})
assert.NoError(t, err)
// Shouldn't have been updated
assert.Equal(t, "123", fetchedItem["RequestedAt"].(*types.AttributeValueMemberN).Value)

_, err = dynamoClient.UpdateItem(ctx, tableName, commondynamodb.Key{
"MetadataKey": &types.AttributeValueMemberS{Value: "key"},
Expand All @@ -210,15 +222,15 @@ func TestBasicOperations(t *testing.T) {
}, "BlobSize", 1000)
assert.NoError(t, err)

item, err = dynamoClient.GetItem(ctx, tableName, commondynamodb.Key{
fetchedItem, err = dynamoClient.GetItem(ctx, tableName, commondynamodb.Key{
"MetadataKey": &types.AttributeValueMemberS{Value: "key"},
})
assert.NoError(t, err)
assert.Equal(t, "key", item["MetadataKey"].(*types.AttributeValueMemberS).Value)
assert.Equal(t, "Confirmed", item["Status"].(*types.AttributeValueMemberS).Value)
assert.Equal(t, "0x123", item["BatchHeaderHash"].(*types.AttributeValueMemberS).Value)
assert.Equal(t, "0", item["BlobIndex"].(*types.AttributeValueMemberN).Value)
assert.Equal(t, "1123", item["BlobSize"].(*types.AttributeValueMemberN).Value)
assert.Equal(t, "key", fetchedItem["MetadataKey"].(*types.AttributeValueMemberS).Value)
assert.Equal(t, "Confirmed", fetchedItem["Status"].(*types.AttributeValueMemberS).Value)
assert.Equal(t, "0x123", fetchedItem["BatchHeaderHash"].(*types.AttributeValueMemberS).Value)
assert.Equal(t, "0", fetchedItem["BlobIndex"].(*types.AttributeValueMemberN).Value)
assert.Equal(t, "1123", fetchedItem["BlobSize"].(*types.AttributeValueMemberN).Value)

err = dynamoClient.DeleteTable(ctx, tableName)
assert.NoError(t, err)
Expand Down
14 changes: 7 additions & 7 deletions core/v2/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func makeTestBlob(t *testing.T, p encoding.Prover, version corev2.BlobVersion, r
}

header := corev2.BlobCertificate{
BlobHeader: corev2.BlobHeader{
BlobHeader: &corev2.BlobHeader{
BlobVersion: version,
QuorumNumbers: quorums,
BlobCommitments: commitments,
Expand All @@ -114,7 +114,7 @@ func makeTestBlob(t *testing.T, p encoding.Prover, version corev2.BlobVersion, r

// prepareBlobs takes in multiple blob, encodes them, generates the associated assignments, and the batch header.
// These are the products that a disperser will need in order to disperse data to the DA nodes.
func prepareBlobs(t *testing.T, operatorCount uint, headers []corev2.BlobCertificate, blobs [][]byte) (map[core.OperatorID][]*corev2.BlobShard, core.IndexedChainState) {
func prepareBlobs(t *testing.T, operatorCount uint, certs []corev2.BlobCertificate, blobs [][]byte) (map[core.OperatorID][]*corev2.BlobShard, core.IndexedChainState) {

cst, err := mock.MakeChainDataMock(map[uint8]int{
0: int(operatorCount),
Expand All @@ -123,11 +123,11 @@ func prepareBlobs(t *testing.T, operatorCount uint, headers []corev2.BlobCertifi
})
assert.NoError(t, err)

blobsMap := make([]map[core.QuorumID]map[core.OperatorID][]*encoding.Frame, 0, len(headers))

for z, header := range headers {
blobsMap := make([]map[core.QuorumID]map[core.OperatorID][]*encoding.Frame, 0, len(certs))

for z, cert := range certs {
blob := blobs[z]
header := cert.BlobHeader

params, err := header.GetEncodingParams()
if err != nil {
Expand All @@ -139,7 +139,7 @@ func prepareBlobs(t *testing.T, operatorCount uint, headers []corev2.BlobCertifi
t.Fatal(err)
}

state, err := cst.GetOperatorState(context.Background(), uint(header.ReferenceBlockNumber), header.QuorumNumbers)
state, err := cst.GetOperatorState(context.Background(), uint(cert.ReferenceBlockNumber), header.QuorumNumbers)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -177,7 +177,7 @@ func prepareBlobs(t *testing.T, operatorCount uint, headers []corev2.BlobCertifi
}
if len(inverseMap[operatorID]) < blobIndex+1 {
inverseMap[operatorID] = append(inverseMap[operatorID], &corev2.BlobShard{
BlobCertificate: headers[blobIndex],
BlobCertificate: certs[blobIndex],
Chunks: make(map[core.QuorumID][]*encoding.Frame),
})
}
Expand Down
6 changes: 4 additions & 2 deletions core/v2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,14 +230,16 @@ func (b *BlobHeader) BlobKey() (BlobKey, error) {
return headerHash, nil
}

type RelayKey uint16

type BlobCertificate struct {
BlobHeader
BlobHeader *BlobHeader

// ReferenceBlockNumber is the block number of the block at which the operator state will be referenced
ReferenceBlockNumber uint64

// RelayKeys
RelayKeys []uint16
RelayKeys []RelayKey
}

type BlobVersionParameters struct {
Expand Down
6 changes: 3 additions & 3 deletions core/v2/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (v *ShardValidator) validateBlobQuorum(quorum core.QuorumID, blob *BlobShar
}

// Get the assignments for the quorum
assignment, err := GetAssignment(operatorState, blob.BlobVersion, quorum, v.operatorID)
assignment, err := GetAssignment(operatorState, blob.BlobHeader.BlobVersion, quorum, v.operatorID)
if err != nil {
return nil, nil, err
}
Expand All @@ -57,7 +57,7 @@ func (v *ShardValidator) validateBlobQuorum(quorum core.QuorumID, blob *BlobShar
}

// Validate the chunkLength against the confirmation and adversary threshold parameters
chunkLength, err := GetChunkLength(blob.BlobVersion, uint32(blob.BlobHeader.BlobCommitments.Length))
chunkLength, err := GetChunkLength(blob.BlobHeader.BlobVersion, uint32(blob.BlobHeader.BlobCommitments.Length))
if err != nil {
return nil, nil, fmt.Errorf("invalid chunk length: %w", err)
}
Expand Down Expand Up @@ -98,7 +98,7 @@ func (v *ShardValidator) ValidateBlobs(ctx context.Context, blobs []*BlobShard,
return err
}
// TODO: Define params for the blob
params, err := blob.GetEncodingParams()
params, err := blob.BlobHeader.GetEncodingParams()
if err != nil {
return err
}
Expand Down
75 changes: 74 additions & 1 deletion disperser/common/v2/blobstore/dynamo_metadata_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package blobstore

import (
"context"
"errors"
"fmt"
"strconv"
"strings"
Expand All @@ -25,6 +26,7 @@ const (

blobKeyPrefix = "BlobKey#"
blobMetadataSK = "BlobMetadata"
blobCertSK = "BlobCertificate"
)

// BlobMetadataStore is a blob metadata storage backed by DynamoDB
Expand All @@ -49,7 +51,12 @@ func (s *BlobMetadataStore) PutBlobMetadata(ctx context.Context, blobMetadata *v
return err
}

return s.dynamoDBClient.PutItem(ctx, s.tableName, item)
err = s.dynamoDBClient.PutItemWithCondition(ctx, s.tableName, item, "attribute_not_exists(PK) AND attribute_not_exists(SK)", nil, nil)
if errors.Is(err, commondynamodb.ErrConditionFailed) {
return disperser.ErrAlreadyExists
}

return err
}

func (s *BlobMetadataStore) GetBlobMetadata(ctx context.Context, blobKey core.BlobKey) (*v2.BlobMetadata, error) {
Expand Down Expand Up @@ -118,6 +125,46 @@ func (s *BlobMetadataStore) GetBlobMetadataCountByStatus(ctx context.Context, st
return count, nil
}

func (s *BlobMetadataStore) PutBlobCertificate(ctx context.Context, blobCert *core.BlobCertificate) error {
item, err := MarshalBlobCertificate(blobCert)
if err != nil {
return err
}

err = s.dynamoDBClient.PutItemWithCondition(ctx, s.tableName, item, "attribute_not_exists(PK) AND attribute_not_exists(SK)", nil, nil)
if errors.Is(err, commondynamodb.ErrConditionFailed) {
return disperser.ErrAlreadyExists
}

return err
}

func (s *BlobMetadataStore) GetBlobCertificate(ctx context.Context, blobKey core.BlobKey) (*core.BlobCertificate, error) {
item, err := s.dynamoDBClient.GetItem(ctx, s.tableName, map[string]types.AttributeValue{
"PK": &types.AttributeValueMemberS{
Value: blobKeyPrefix + blobKey.Hex(),
},
"SK": &types.AttributeValueMemberS{
Value: blobCertSK,
},
})

if err != nil {
return nil, err
}

if item == nil {
return nil, fmt.Errorf("%w: certificate not found for key %s", disperser.ErrMetadataNotFound, blobKey.Hex())
}

cert, err := UnmarshalBlobCertificate(item)
if err != nil {
return nil, err
}

return cert, nil
}

func GenerateTableSchema(tableName string, readCapacityUnits int64, writeCapacityUnits int64) *dynamodb.CreateTableInput {
return &dynamodb.CreateTableInput{
AttributeDefinitions: []types.AttributeDefinition{
Expand Down Expand Up @@ -272,3 +319,29 @@ func UnmarshalBlobMetadata(item commondynamodb.Item) (*v2.BlobMetadata, error) {
}
return &metadata, nil
}

func MarshalBlobCertificate(blobCert *core.BlobCertificate) (commondynamodb.Item, error) {
fields, err := attributevalue.MarshalMap(blobCert)
if err != nil {
return nil, fmt.Errorf("failed to marshal blob certificate: %w", err)
}

// Add PK and SK fields
blobKey, err := blobCert.BlobHeader.BlobKey()
if err != nil {
return nil, err
}
fields["PK"] = &types.AttributeValueMemberS{Value: blobKeyPrefix + blobKey.Hex()}
fields["SK"] = &types.AttributeValueMemberS{Value: blobCertSK}

return fields, nil
}

func UnmarshalBlobCertificate(item commondynamodb.Item) (*core.BlobCertificate, error) {
cert := core.BlobCertificate{}
err := attributevalue.UnmarshalMap(item, &cert)
if err != nil {
return nil, err
}
return &cert, nil
}
Loading
Loading