Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DataApi] Get BlobCount By AccountID #541

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion disperser/apiserver/ratelimit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func simulateClient(t *testing.T, signer core.BlobRequestSigner, origin string,

authHeader := core.BlobAuthHeader{
BlobCommitments: encoding.BlobCommitments{},
AccountID: "",
AccountID: "test",
Nonce: authHeaderReply.BlobAuthHeader.ChallengeParameter,
}

Expand Down
5 changes: 5 additions & 0 deletions disperser/apiserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,11 @@ func (s *DispersalServer) checkRateLimitsAndAddRatesToHeader(ctx context.Context
// Update the quorum rate
blob.RequestHeader.SecurityParams[i].QuorumRate = accountRates.Throughput

// Update AccountID to accountKey
// This is a combination of origin and authenticatedAddress
// AccountId is later used to track blobs sent by the same account
blob.RequestHeader.BlobAuthHeader.AccountID = accountKey
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might need to change depending on if the authenticated endpoint is being used right ?


// Get the encoded blob size from the blob header. Calculation is done in a way that nodes can replicate
encodedLength := encoding.GetEncodedBlobLength(length, uint8(param.ConfirmationThreshold), uint8(param.AdversaryThreshold))
encodedSize := encoding.GetBlobSize(encodedLength)
Expand Down
4 changes: 4 additions & 0 deletions disperser/apiserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,9 +730,13 @@ func simulateBlobConfirmation(t *testing.T, requestID []byte, blobSize uint, sec
BlobStatus: disperser.Processing,
Expiry: 0,
NumRetries: 0,
AccountID: "test",
RequestMetadata: &disperser.RequestMetadata{
BlobRequestHeader: core.BlobRequestHeader{
SecurityParams: securityParams,
BlobAuthHeader: core.BlobAuthHeader{
AccountID: "test",
},
},
RequestedAt: requestedAt,
BlobSize: blobSize,
Expand Down
45 changes: 43 additions & 2 deletions disperser/common/blobstore/blob_metadata_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

commondynamodb "github.com/Layr-Labs/eigenda/common/aws/dynamodb"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/disperser"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/aws/aws-sdk-go-v2/aws"
Expand All @@ -16,8 +17,9 @@ import (
)

const (
statusIndexName = "StatusIndex"
batchIndexName = "BatchIndex"
statusIndexName = "StatusIndex"
batchIndexName = "BatchIndex"
accountIdIndexName = "AccountIdIndex"
)

// BlobMetadataStore is a blob metadata storage backed by DynamoDB
Expand Down Expand Up @@ -218,6 +220,21 @@ func (s *BlobMetadataStore) GetBlobMetadataInBatch(ctx context.Context, batchHea
return metadata, nil
}

// GetBlobMetadataByAccount Count returns the count of all the metadata with the given status
// Because this function scans the entire index, it should only be used for status with a limited number of items.
// It should only be used to filter "Processing" status. To support other status, a streaming version should be implemented.
func (s *BlobMetadataStore) GetBlobMetadataCountByAccountID(ctx context.Context, accountID core.AccountID) (int32, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably want the total amount of data rather than the total number of blobs. Or perhaps both.

Let's do another sync on the exact use case for this work!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mooselumph i think if we want total amount of data everytime than better approach is to have a lambda invoke on DynamodB stream.....which only processes INSERT_EVENT and can be used to update:

  1. Amount of Data
  2. Count of blobs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for now temporarily i will just update Batcher to update amount of data after Blob is confirmed and increment count

count, err := s.dynamoDBClient.QueryIndexCount(ctx, s.tableName, accountIdIndexName, "AccountID = :accountID", commondynamodb.ExpresseionValues{
":accountID": &types.AttributeValueMemberS{
Value: accountID,
}})
if err != nil {
return 0, err
}

return count, nil
}

func (s *BlobMetadataStore) IncrementNumRetries(ctx context.Context, existingMetadata *disperser.BlobMetadata) error {
_, err := s.dynamoDBClient.UpdateItem(ctx, s.tableName, map[string]types.AttributeValue{
"BlobHash": &types.AttributeValueMemberS{
Expand Down Expand Up @@ -297,6 +314,10 @@ func GenerateTableSchema(metadataTableName string, readCapacityUnits int64, writ
AttributeName: aws.String("BlobIndex"),
AttributeType: types.ScalarAttributeTypeN,
},
{
AttributeName: aws.String("AccountID"),
AttributeType: types.ScalarAttributeTypeS,
},
},
KeySchema: []types.KeySchemaElement{
{
Expand Down Expand Up @@ -350,6 +371,26 @@ func GenerateTableSchema(metadataTableName string, readCapacityUnits int64, writ
WriteCapacityUnits: aws.Int64(writeCapacityUnits),
},
},
{
IndexName: aws.String(accountIdIndexName),
KeySchema: []types.KeySchemaElement{
{
AttributeName: aws.String("AccountID"),
KeyType: types.KeyTypeHash,
},
{
AttributeName: aws.String("RequestedAt"),
KeyType: types.KeyTypeRange,
},
},
Projection: &types.Projection{
ProjectionType: types.ProjectionTypeAll,
},
ProvisionedThroughput: &types.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(readCapacityUnits),
WriteCapacityUnits: aws.Int64(writeCapacityUnits),
},
},
},
ProvisionedThroughput: &types.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(readCapacityUnits),
Expand Down
166 changes: 166 additions & 0 deletions disperser/common/blobstore/blob_metadata_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func TestBlobMetadataStoreOperations(t *testing.T) {
BlobStatus: disperser.Processing,
Expiry: 0,
NumRetries: 0,
AccountID: "test",
RequestMetadata: &disperser.RequestMetadata{
BlobRequestHeader: blob.RequestHeader,
BlobSize: blobSize,
Expand All @@ -43,6 +44,7 @@ func TestBlobMetadataStoreOperations(t *testing.T) {
BlobStatus: disperser.Finalized,
Expiry: 0,
NumRetries: 0,
AccountID: "test",
RequestMetadata: &disperser.RequestMetadata{
BlobRequestHeader: blob.RequestHeader,
BlobSize: blobSize,
Expand Down Expand Up @@ -123,6 +125,7 @@ func TestBlobMetadataStoreOperationsWithPagination(t *testing.T) {
BlobStatus: disperser.Processing,
Expiry: 0,
NumRetries: 0,
AccountID: "test",
RequestMetadata: &disperser.RequestMetadata{
BlobRequestHeader: blob.RequestHeader,
BlobSize: blobSize,
Expand All @@ -139,6 +142,7 @@ func TestBlobMetadataStoreOperationsWithPagination(t *testing.T) {
BlobStatus: disperser.Finalized,
Expiry: 0,
NumRetries: 0,
AccountID: "test",
RequestMetadata: &disperser.RequestMetadata{
BlobRequestHeader: blob.RequestHeader,
BlobSize: blobSize,
Expand Down Expand Up @@ -229,9 +233,13 @@ func getConfirmedMetadata(t *testing.T, metadataKey disperser.BlobKey) *disperse
BlobStatus: disperser.Confirmed,
Expiry: 0,
NumRetries: 0,
AccountID: "test",
RequestMetadata: &disperser.RequestMetadata{
BlobRequestHeader: core.BlobRequestHeader{
SecurityParams: securityParams,
BlobAuthHeader: core.BlobAuthHeader{
AccountID: "test",
},
},
RequestedAt: requestedAt,
BlobSize: blobSize,
Expand All @@ -254,3 +262,161 @@ func getConfirmedMetadata(t *testing.T, metadataKey disperser.BlobKey) *disperse
},
}
}

func TestBlobMetadataStoreWithAccountId(t *testing.T) {
ctx := context.Background()
blobKey1 := disperser.BlobKey{
BlobHash: blobHash,
MetadataHash: "hash",
}
metadata1 := &disperser.BlobMetadata{
MetadataHash: blobKey1.MetadataHash,
BlobHash: blobHash,
BlobStatus: disperser.Processing,
AccountID: "test",
Expiry: 0,
NumRetries: 0,
RequestMetadata: &disperser.RequestMetadata{
BlobRequestHeader: blob.RequestHeader,
BlobSize: blobSize,
RequestedAt: 123,
},
}
blobKey2 := disperser.BlobKey{
BlobHash: "blob2",
MetadataHash: "hash2",
}
metadata2 := &disperser.BlobMetadata{
MetadataHash: blobKey2.MetadataHash,
BlobHash: blobKey2.BlobHash,
BlobStatus: disperser.Finalized,
AccountID: "test",
Expiry: 0,
NumRetries: 0,
RequestMetadata: &disperser.RequestMetadata{
BlobRequestHeader: blob.RequestHeader,
BlobSize: blobSize,
RequestedAt: 124,
},
ConfirmationInfo: &disperser.ConfirmationInfo{},
}
err := blobMetadataStore.QueueNewBlobMetadata(ctx, metadata1)
assert.NoError(t, err)
err = blobMetadataStore.QueueNewBlobMetadata(ctx, metadata2)
assert.NoError(t, err)

processing, err := blobMetadataStore.GetBlobMetadataByStatus(ctx, disperser.Processing)
assert.NoError(t, err)
assert.Len(t, processing, 1)
assert.Equal(t, metadata1, processing[0])

processingCount, err := blobMetadataStore.GetBlobMetadataByStatusCount(ctx, disperser.Processing)
assert.NoError(t, err)
assert.Equal(t, int32(1), processingCount)

blobCountByAccountId, err := blobMetadataStore.GetBlobMetadataCountByAccountID(ctx, "test")
assert.NoError(t, err)
assert.Equal(t, int32(2), blobCountByAccountId)

deleteItems(t, []commondynamodb.Key{
{
"MetadataHash": &types.AttributeValueMemberS{Value: blobKey1.MetadataHash},
"BlobHash": &types.AttributeValueMemberS{Value: blobKey1.BlobHash},
},
{
"MetadataHash": &types.AttributeValueMemberS{Value: blobKey2.MetadataHash},
"BlobHash": &types.AttributeValueMemberS{Value: blobKey2.BlobHash},
},
})
}

func TestBlobMetadataStoreTwoDifferentAccountId(t *testing.T) {
ctx := context.Background()
blobKey1 := disperser.BlobKey{
BlobHash: blobHash,
MetadataHash: "hash",
}
blob1 := &core.Blob{
RequestHeader: core.BlobRequestHeader{
SecurityParams: securityParams,
BlobAuthHeader: core.BlobAuthHeader{
AccountID: "test1",
},
},

Data: []byte("test1"),
}
metadata1 := &disperser.BlobMetadata{
MetadataHash: blobKey1.MetadataHash,
BlobHash: blobHash,
BlobStatus: disperser.Processing,
AccountID: "test1",
Expiry: 0,
NumRetries: 0,
RequestMetadata: &disperser.RequestMetadata{
BlobRequestHeader: blob1.RequestHeader,
BlobSize: uint(len(blob1.Data)),
RequestedAt: 123,
},
}
blobKey2 := disperser.BlobKey{
BlobHash: "blob2",
MetadataHash: "hash2",
}
blob2 := &core.Blob{
RequestHeader: core.BlobRequestHeader{
SecurityParams: securityParams,
BlobAuthHeader: core.BlobAuthHeader{
AccountID: "test2",
},
},

Data: []byte("test2"),
}
metadata2 := &disperser.BlobMetadata{
MetadataHash: blobKey2.MetadataHash,
BlobHash: blobKey2.BlobHash,
BlobStatus: disperser.Finalized,
AccountID: "test2",
Expiry: 0,
NumRetries: 0,
RequestMetadata: &disperser.RequestMetadata{
BlobRequestHeader: blob2.RequestHeader,
BlobSize: uint(len(blob2.Data)),
RequestedAt: 123,
},
ConfirmationInfo: &disperser.ConfirmationInfo{},
}
err := blobMetadataStore.QueueNewBlobMetadata(ctx, metadata1)
assert.NoError(t, err)
err = blobMetadataStore.QueueNewBlobMetadata(ctx, metadata2)
assert.NoError(t, err)

processing, err := blobMetadataStore.GetBlobMetadataByStatus(ctx, disperser.Processing)
assert.NoError(t, err)
assert.Len(t, processing, 1)
assert.Equal(t, metadata1, processing[0])

processingCount, err := blobMetadataStore.GetBlobMetadataByStatusCount(ctx, disperser.Processing)
assert.NoError(t, err)
assert.Equal(t, int32(1), processingCount)

blobCountByAccountId1, err := blobMetadataStore.GetBlobMetadataCountByAccountID(ctx, "test1")
assert.NoError(t, err)
assert.Equal(t, int32(1), blobCountByAccountId1)

blobCountByAccountId2, err := blobMetadataStore.GetBlobMetadataCountByAccountID(ctx, "test2")
assert.NoError(t, err)
assert.Equal(t, int32(1), blobCountByAccountId2)

deleteItems(t, []commondynamodb.Key{
{
"MetadataHash": &types.AttributeValueMemberS{Value: blobKey1.MetadataHash},
"BlobHash": &types.AttributeValueMemberS{Value: blobKey1.BlobHash},
},
{
"MetadataHash": &types.AttributeValueMemberS{Value: blobKey2.MetadataHash},
"BlobHash": &types.AttributeValueMemberS{Value: blobKey2.BlobHash},
},
})
}
4 changes: 4 additions & 0 deletions disperser/common/blobstore/blobstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ var (
blob = &core.Blob{
RequestHeader: core.BlobRequestHeader{
SecurityParams: securityParams,
BlobAuthHeader: core.BlobAuthHeader{
AccountID: "test",
},
},

Data: []byte("test"),
}
s3Client = cmock.NewS3Client()
Expand Down
5 changes: 5 additions & 0 deletions disperser/common/blobstore/shared_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func (s *SharedBlobStore) StoreBlob(ctx context.Context, blob *core.Blob, reques
metadata := disperser.BlobMetadata{
BlobHash: blobHash,
MetadataHash: metadataHash,
AccountID: blob.RequestHeader.AccountID,
NumRetries: 0,
BlobStatus: disperser.Processing,
Expiry: expiry,
Expand Down Expand Up @@ -237,6 +238,10 @@ func (s *SharedBlobStore) GetBlobMetadata(ctx context.Context, metadataKey dispe
return s.blobMetadataStore.GetBlobMetadata(ctx, metadataKey)
}

func (s *SharedBlobStore) GetBlobMetadataCountByAccountID(ctx context.Context, accountID core.AccountID) (int32, error) {
return s.blobMetadataStore.GetBlobMetadataCountByAccountID(ctx, accountID)
}

func (s *SharedBlobStore) HandleBlobFailure(ctx context.Context, metadata *disperser.BlobMetadata, maxRetry uint) (bool, error) {
if metadata.NumRetries < maxRetry {
if err := s.MarkBlobProcessing(ctx, metadata.GetBlobKey()); err != nil {
Expand Down
4 changes: 4 additions & 0 deletions disperser/common/blobstore/shared_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,11 @@ func TestSharedBlobStore(t *testing.T) {
BlobStatus: disperser.Processing,
Expiry: 0,
NumRetries: 0,
AccountID: "test",
RequestMetadata: &disperser.RequestMetadata{
BlobRequestHeader: core.BlobRequestHeader{
SecurityParams: securityParams,
BlobAuthHeader: blob.RequestHeader.BlobAuthHeader,
},
RequestedAt: requestedAt,
BlobSize: blobSize,
Expand Down Expand Up @@ -138,9 +140,11 @@ func TestSharedBlobStore(t *testing.T) {
BlobStatus: disperser.Processing,
Expiry: 0,
NumRetries: 0,
AccountID: "test",
RequestMetadata: &disperser.RequestMetadata{
BlobRequestHeader: core.BlobRequestHeader{
SecurityParams: securityParams,
BlobAuthHeader: blob.RequestHeader.BlobAuthHeader,
},
RequestedAt: requestedAt,
BlobSize: blobSize2,
Expand Down
Loading
Loading