Skip to content

Commit

Permalink
[payments] meterer core logic (#790)
Browse files Browse the repository at this point in the history
  • Loading branch information
hopeyen authored Oct 22, 2024
1 parent 485f49e commit febcd95
Show file tree
Hide file tree
Showing 12 changed files with 898 additions and 145 deletions.
9 changes: 9 additions & 0 deletions common/aws/dynamodb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,15 @@ func (c *Client) Query(ctx context.Context, tableName string, keyCondition strin
return response.Items, nil
}

// QueryWithInput is a wrapper for the Query function that allows for a custom query input
func (c *Client) QueryWithInput(ctx context.Context, input *dynamodb.QueryInput) ([]Item, error) {
response, err := c.dynamoClient.Query(ctx, input)
if err != nil {
return nil, err
}
return response.Items, nil
}

// QueryIndexCount returns the count of the items in the index that match the given key
func (c *Client) QueryIndexCount(ctx context.Context, tableName string, indexName string, keyCondition string, expAttributeValues ExpressionValues) (int32, error) {
response, err := c.dynamoClient.Query(ctx, &dynamodb.QueryInput{
Expand Down
88 changes: 88 additions & 0 deletions common/aws/dynamodb/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,3 +661,91 @@ func TestQueryIndexOrderWithLimit(t *testing.T) {
assert.NoError(t, err)
assert.Len(t, queryResult, 30) // Should return all items
}

func TestQueryWithInput(t *testing.T) {
tableName := "ProcessingQueryWithInput"
createTable(t, tableName)

ctx := context.Background()
numItems := 30
items := make([]commondynamodb.Item, numItems)
for i := 0; i < numItems; i++ {
requestedAt := time.Now().Add(-time.Duration(i) * time.Minute).Unix()
items[i] = commondynamodb.Item{
"MetadataKey": &types.AttributeValueMemberS{Value: fmt.Sprintf("key%d", i)},
"BlobKey": &types.AttributeValueMemberS{Value: fmt.Sprintf("blob%d", i)},
"BlobSize": &types.AttributeValueMemberN{Value: "123"},
"BlobStatus": &types.AttributeValueMemberN{Value: "0"},
"RequestedAt": &types.AttributeValueMemberN{Value: strconv.FormatInt(requestedAt, 10)},
}
}
unprocessed, err := dynamoClient.PutItems(ctx, tableName, items)
assert.NoError(t, err)
assert.Len(t, unprocessed, 0)

// Test forward order with limit
queryInput := &dynamodb.QueryInput{
TableName: aws.String(tableName),
IndexName: aws.String("StatusIndex"),
KeyConditionExpression: aws.String("BlobStatus = :status"),
ExpressionAttributeValues: commondynamodb.ExpressionValues{
":status": &types.AttributeValueMemberN{Value: "0"},
},
ScanIndexForward: aws.Bool(true),
Limit: aws.Int32(10),
}
queryResult, err := dynamoClient.QueryWithInput(ctx, queryInput)
assert.NoError(t, err)
assert.Len(t, queryResult, 10)
// Check if the items are in ascending order
for i := 0; i < len(queryResult)-1; i++ {
assert.True(t, queryResult[i]["RequestedAt"].(*types.AttributeValueMemberN).Value <= queryResult[i+1]["RequestedAt"].(*types.AttributeValueMemberN).Value)
}

// Test reverse order with limit
queryInput = &dynamodb.QueryInput{
TableName: aws.String(tableName),
IndexName: aws.String("StatusIndex"),
KeyConditionExpression: aws.String("BlobStatus = :status"),
ExpressionAttributeValues: commondynamodb.ExpressionValues{
":status": &types.AttributeValueMemberN{Value: "0"},
},
ScanIndexForward: aws.Bool(false),
Limit: aws.Int32(10),
}
queryResult, err = dynamoClient.QueryWithInput(ctx, queryInput)
assert.NoError(t, err)
assert.Len(t, queryResult, 10)
// Check if the items are in descending order
for i := 0; i < len(queryResult)-1; i++ {
assert.True(t, queryResult[i]["RequestedAt"].(*types.AttributeValueMemberN).Value >= queryResult[i+1]["RequestedAt"].(*types.AttributeValueMemberN).Value)
}

// Test with a smaller limit
queryInput = &dynamodb.QueryInput{
TableName: aws.String(tableName),
IndexName: aws.String("StatusIndex"),
KeyConditionExpression: aws.String("BlobStatus = :status"),
ExpressionAttributeValues: commondynamodb.ExpressionValues{
":status": &types.AttributeValueMemberN{Value: "0"},
},
Limit: aws.Int32(5),
}
queryResult, err = dynamoClient.QueryWithInput(ctx, queryInput)
assert.NoError(t, err)
assert.Len(t, queryResult, 5)

// Test with a limit larger than the number of items
queryInput = &dynamodb.QueryInput{
TableName: aws.String(tableName),
IndexName: aws.String("StatusIndex"),
KeyConditionExpression: aws.String("BlobStatus = :status"),
ExpressionAttributeValues: commondynamodb.ExpressionValues{
":status": &types.AttributeValueMemberN{Value: "0"},
},
Limit: aws.Int32(50),
}
queryResult, err = dynamoClient.QueryWithInput(ctx, queryInput)
assert.NoError(t, err)
assert.Len(t, queryResult, 30) // Should return all items
}
24 changes: 17 additions & 7 deletions core/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,14 @@ type Blob struct {
Data []byte
}

func (b *Blob) GetQuorumNumbers() []uint8 {
quorumNumbers := make([]uint8, 0, len(b.RequestHeader.SecurityParams))
for _, sp := range b.RequestHeader.SecurityParams {
quorumNumbers = append(quorumNumbers, sp.QuorumID)
}
return quorumNumbers
}

// BlobAuthHeader contains the data that a user must sign to authenticate a blob request.
// Signing the combination of the Nonce and the BlobCommitments prohibits the disperser from
// using the signature to charge the user for a different blob or for dispersing the same blob
Expand Down Expand Up @@ -482,22 +490,24 @@ type PaymentMetadata struct {
BinIndex uint32
// TODO: we are thinking the contract can use uint128 for cumulative payment,
// but the definition on v2 uses uint64. Double check with team.
CumulativePayment uint64
CumulativePayment *big.Int
}

// Hash returns the Keccak256 hash of the PaymentMetadata
func (pm *PaymentMetadata) Hash() []byte {
// Create a byte slice to hold the serialized data
data := make([]byte, 0, len(pm.AccountID)+12)
data := make([]byte, 0, len(pm.AccountID)+4+pm.CumulativePayment.BitLen()/8+1)

// Append AccountID
data = append(data, []byte(pm.AccountID)...)

// Append BinIndex
binIndexBytes := make([]byte, 4)
binary.BigEndian.PutUint32(binIndexBytes, pm.BinIndex)
data = append(data, binIndexBytes...)

paymentBytes := make([]byte, 8)
binary.BigEndian.PutUint64(paymentBytes, pm.CumulativePayment)
// Append CumulativePayment
paymentBytes := pm.CumulativePayment.Bytes()
data = append(data, paymentBytes...)

return crypto.Keccak256(data)
Expand All @@ -506,12 +516,12 @@ func (pm *PaymentMetadata) Hash() []byte {
// OperatorInfo contains information about an operator which is stored on the blockchain state,
// corresponding to a particular quorum
type ActiveReservation struct {
DataRate uint64 // Bandwidth per reservation bin
SymbolsPerSec uint64 // reserve number of symbols per second
StartTimestamp uint64 // Unix timestamp that's valid for basically eternity
EndTimestamp uint64

QuorumNumbers []uint8
QuorumSplit []byte // ordered mapping of quorum number to payment split; on-chain validation should ensure split <= 100
QuorumNumbers []uint8 // allowed quorums
QuorumSplit []byte // ordered mapping of quorum number to payment split; on-chain validation should ensure split <= 100
}

type OnDemandPayment struct {
Expand Down
14 changes: 12 additions & 2 deletions core/eth/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,16 +763,26 @@ func (t *Transactor) GetRequiredQuorumNumbers(ctx context.Context, blockNumber u
return requiredQuorums, nil
}

func (t *Transactor) GetActiveReservations(ctx context.Context, blockNumber uint32) (map[string]core.ActiveReservation, error) {
func (t *Transactor) GetActiveReservations(ctx context.Context, blockNumber uint32, accountIDs []string) (map[string]core.ActiveReservation, error) {
// contract is not implemented yet
return map[string]core.ActiveReservation{}, nil
}

func (t *Transactor) GetOnDemandPayments(ctx context.Context, blockNumber uint32) (map[string]core.OnDemandPayment, error) {
func (t *Transactor) GetActiveReservationByAccount(ctx context.Context, blockNumber uint32, accountID string) (core.ActiveReservation, error) {
// contract is not implemented yet
return core.ActiveReservation{}, nil
}

func (t *Transactor) GetOnDemandPayments(ctx context.Context, blockNumber uint32, accountIDs []string) (map[string]core.OnDemandPayment, error) {
// contract is not implemented yet
return map[string]core.OnDemandPayment{}, nil
}

func (t *Transactor) GetOnDemandPaymentByAccount(ctx context.Context, blockNumber uint32, accountID string) (core.OnDemandPayment, error) {
// contract is not implemented yet
return core.OnDemandPayment{}, nil
}

func (t *Transactor) updateContractBindings(blsOperatorStateRetrieverAddr, eigenDAServiceManagerAddr gethcommon.Address) error {

contractEigenDAServiceManager, err := eigendasrvmg.NewContractEigenDAServiceManager(eigenDAServiceManagerAddr, t.EthClient)
Expand Down
Loading

0 comments on commit febcd95

Please sign in to comment.