Skip to content

Commit

Permalink
feat: meterer helper types and structs
Browse files Browse the repository at this point in the history
  • Loading branch information
hopeyen committed Oct 8, 2024
1 parent 1692ab3 commit d12b4a2
Show file tree
Hide file tree
Showing 11 changed files with 1,383 additions and 4 deletions.
2 changes: 1 addition & 1 deletion common/aws/dynamodb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ var (

type Item = map[string]types.AttributeValue
type Key = map[string]types.AttributeValue
type ExpresseionValues = map[string]types.AttributeValue
type ExpresseionValues = map[string]types.AttributeValue // is this a typo? ExpressionValues?

type QueryResult struct {
Items []Item
Expand Down
4 changes: 2 additions & 2 deletions core/assignment.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (c *StdAssignmentCoordinator) ValidateChunkLength(state *OperatorState, blo
denom := new(big.Int).Mul(big.NewInt(int64(info.ConfirmationThreshold-info.AdversaryThreshold)), totalStake)
maxChunkLength := uint(roundUpDivideBig(num, denom).Uint64())

maxChunkLength2 := roundUpDivide(2*blobLength*percentMultiplier, MaxRequiredNumChunks*uint(info.ConfirmationThreshold-info.AdversaryThreshold))
maxChunkLength2 := RoundUpDivide(2*blobLength*percentMultiplier, MaxRequiredNumChunks*uint(info.ConfirmationThreshold-info.AdversaryThreshold))

if maxChunkLength < maxChunkLength2 {
maxChunkLength = maxChunkLength2
Expand Down Expand Up @@ -271,7 +271,7 @@ func roundUpDivideBig(a, b *big.Int) *big.Int {

}

func roundUpDivide(a, b uint) uint {
func RoundUpDivide(a, b uint) uint {
return (a + b - 1) / b

}
Expand Down
2 changes: 1 addition & 1 deletion core/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func (b *BlobHeader) EncodedSizeAllQuorums() int64 {
size := int64(0)
for _, quorum := range b.QuorumInfos {

size += int64(roundUpDivide(b.Length*percentMultiplier*encoding.BYTES_PER_SYMBOL, uint(quorum.ConfirmationThreshold-quorum.AdversaryThreshold)))
size += int64(RoundUpDivide(b.Length*percentMultiplier*encoding.BYTES_PER_SYMBOL, uint(quorum.ConfirmationThreshold-quorum.AdversaryThreshold)))
}
return size
}
Expand Down
64 changes: 64 additions & 0 deletions core/meterer/meterer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package meterer

import (
"time"

"github.com/Layr-Labs/eigensdk-go/logging"
)

type TimeoutConfig struct {
ChainReadTimeout time.Duration
ChainWriteTimeout time.Duration
ChainStateTimeout time.Duration
TxnBroadcastTimeout time.Duration
}

// network parameters (this should be published on-chain and read through contracts)
type Config struct {
GlobalBytesPerSecond uint64 // 2^64 bytes ~= 18 exabytes per second; if we use uint32, that's ~4GB/s
PricePerChargeable uint32 // 2^64 gwei ~= 18M Eth; uint32 => ~4ETH
MinChargeableSize uint32
ReservationWindow uint32
}

// disperser API server will receive requests from clients. these requests will be with a blobHeader with payments information (CumulativePayments, BinIndex, and Signature)
// Disperser will pass the blob header to the meterer, which will check if the payments information is valid. if it is, it will be added to the meterer's state.
// To check if the payment is valid, the meterer will:
// 1. check if the signature is valid
// (against the CumulativePayments and BinIndex fields ;
// maybe need something else to secure against using this appraoch for reservations when rev request comes in same bin interval; say that nonce is signed over as well)
// 2. For reservations, check offchain bin state as demonstrated in pseudocode, also check onchain state before rejecting (since onchain data is pulled)
// 3. For on-demand, check against payments and the global rates, similar to the reservation case
//
// If the payment is valid, the meterer will add the blob header to its state and return a success response to the disperser API server.
// if any of the checks fail, the meterer will return a failure response to the disperser API server.
var OnDemandQuorumNumbers = []uint8{0, 1}

type Meterer struct {
Config
TimeoutConfig

ChainState *OnchainPaymentState
OffchainStore *OffchainStore

logger logging.Logger
}

func NewMeterer(
config Config,
timeoutConfig TimeoutConfig,
paymentChainState *OnchainPaymentState,
offchainStore *OffchainStore,
logger logging.Logger,
) (*Meterer, error) {
// TODO: create a separate thread to pull from the chain and update chain state
return &Meterer{
Config: config,
TimeoutConfig: timeoutConfig,

ChainState: paymentChainState,
OffchainStore: offchainStore,

logger: logger.With("component", "Meterer"),
}, nil
}
293 changes: 293 additions & 0 deletions core/meterer/offchain_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,293 @@
package meterer

import (
"context"
"errors"
"fmt"
"strconv"
"strings"
"time"

commonaws "github.com/Layr-Labs/eigenda/common/aws"
commondynamodb "github.com/Layr-Labs/eigenda/common/aws/dynamodb"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
)

type OffchainStore struct {
dynamoClient *commondynamodb.Client
reservationTableName string
onDemandTableName string
globalBinTableName string
logger logging.Logger
// TODO: add maximum storage for both tables
}

func NewOffchainStore(
cfg commonaws.ClientConfig,
reservationTableName string,
onDemandTableName string,
globalBinTableName string,
logger logging.Logger,
) (*OffchainStore, error) {

dynamoClient, err := commondynamodb.NewClient(cfg, logger)
if err != nil {
return nil, err
}
if reservationTableName == "" || onDemandTableName == "" || globalBinTableName == "" {
return nil, fmt.Errorf("table names cannot be empty")
}

err = CreateReservationTable(cfg, reservationTableName)
if err != nil && !strings.Contains(err.Error(), "Table already exists") {
fmt.Println("Error creating reservation table:", err)
return nil, err
}
err = CreateGlobalReservationTable(cfg, globalBinTableName)
if err != nil && !strings.Contains(err.Error(), "Table already exists") {
fmt.Println("Error creating global bin table:", err)
return nil, err
}
err = CreateOnDemandTable(cfg, onDemandTableName)
if err != nil && !strings.Contains(err.Error(), "Table already exists") {
fmt.Println("Error creating on-demand table:", err)
return nil, err
}
//TODO: add a separate thread to periodically clean up the tables
// delete expired reservation bins (<i-1) and old on-demand payments (retain max N payments)
return &OffchainStore{
dynamoClient: dynamoClient,
reservationTableName: reservationTableName,
onDemandTableName: onDemandTableName,
globalBinTableName: globalBinTableName,
logger: logger,
}, nil
}

type ReservationBin struct {
AccountID string
BinIndex uint32
BinUsage uint32
UpdatedAt time.Time
}

type PaymentTuple struct {
CumulativePayment uint64
DataLength uint32
}

type GlobalBin struct {
BinIndex uint32
BinUsage uint64
UpdatedAt time.Time
}

func (s *OffchainStore) UpdateReservationBin(ctx context.Context, accountID string, binIndex uint64, size uint64) (uint64, error) {
key := map[string]types.AttributeValue{
"AccountID": &types.AttributeValueMemberS{Value: accountID},
"BinIndex": &types.AttributeValueMemberN{Value: strconv.FormatUint(binIndex, 10)},
}

update := map[string]types.AttributeValue{
"BinUsage": &types.AttributeValueMemberN{Value: strconv.FormatUint(uint64(size), 10)},
}

fmt.Println("increment the item in a table", s.reservationTableName)
res, err := s.dynamoClient.UpdateItemIncrement(ctx, s.reservationTableName, key, update)
if err != nil {
return 0, fmt.Errorf("failed to increment bin usage: %w", err)
}

binUsage, ok := res["BinUsage"]
if !ok {
return 0, errors.New("BinUsage is not present in the response")
}

binUsageAttr, ok := binUsage.(*types.AttributeValueMemberN)
if !ok {
return 0, fmt.Errorf("unexpected type for BinUsage: %T", binUsage)
}

binUsageValue, err := strconv.ParseUint(binUsageAttr.Value, 10, 32)
if err != nil {
return 0, fmt.Errorf("failed to parse BinUsage: %w", err)
}

return binUsageValue, nil
}

func (s *OffchainStore) UpdateGlobalBin(ctx context.Context, binIndex uint64, size uint32) (uint64, error) {
key := map[string]types.AttributeValue{
"BinIndex": &types.AttributeValueMemberN{Value: strconv.FormatUint(binIndex, 10)},
}

update := map[string]types.AttributeValue{
"BinUsage": &types.AttributeValueMemberN{Value: strconv.FormatUint(uint64(size), 10)},
}
res, err := s.dynamoClient.UpdateItemIncrement(ctx, s.globalBinTableName, key, update)
if err != nil {
return 0, err
}

binUsage, ok := res["BinUsage"]
if !ok {
return 0, nil
}

binUsageAttr, ok := binUsage.(*types.AttributeValueMemberN)
if !ok {
return 0, nil
}

binUsageValue, err := strconv.ParseUint(binUsageAttr.Value, 10, 32)
if err != nil {
return 0, err
}

return binUsageValue, nil
}

func (s *OffchainStore) FindReservationBin(ctx context.Context, accountID string, binIndex uint64) (*ReservationBin, error) {
key := map[string]types.AttributeValue{
"AccountID": &types.AttributeValueMemberS{Value: accountID},
"BinIndex": &types.AttributeValueMemberN{Value: strconv.FormatUint(binIndex, 10)},
}

result, err := s.dynamoClient.GetItem(ctx, s.reservationTableName, key)
if err != nil {
return nil, err
}

if result == nil {
return nil, errors.New("reservation not found")
}

var reservation ReservationBin
err = attributevalue.UnmarshalMap(result, &reservation)
if err != nil {
return nil, err
}

return &reservation, nil
}

// Find all reservation bins for a given account
func (s *OffchainStore) FindReservationBins(ctx context.Context, accountID string) ([]ReservationBin, error) {
result, err := s.dynamoClient.QueryIndex(ctx, s.reservationTableName, "AccountIDIndex", "AccountID = :accountID", commondynamodb.ExpresseionValues{
":accountID": &types.AttributeValueMemberS{Value: accountID},
})
if err != nil {
return nil, err
}

if result == nil {
return nil, errors.New("reservation not found")
}

var reservations []ReservationBin
err = attributevalue.UnmarshalListOfMaps(result, &reservations)
if err != nil {
return nil, err
}

return reservations, nil
}

func (s *OffchainStore) AddOnDemandPayment(ctx context.Context, blobHeader BlobHeader, blobSizeCharged uint32) error {
result, err := s.dynamoClient.GetItem(ctx, s.onDemandTableName,
commondynamodb.Item{
"AccountID": &types.AttributeValueMemberS{Value: blobHeader.AccountID},
"CumulativePayments": &types.AttributeValueMemberN{Value: strconv.FormatUint(blobHeader.CumulativePayment, 10)},
},
)
if err != nil {
fmt.Println("new payment record: %w", err)
}
if result != nil {
return fmt.Errorf("exact payment already exists")
}
err = s.dynamoClient.PutItem(ctx, s.onDemandTableName,
commondynamodb.Item{
"AccountID": &types.AttributeValueMemberS{Value: blobHeader.AccountID},
"CumulativePayments": &types.AttributeValueMemberN{Value: strconv.FormatUint(blobHeader.CumulativePayment, 10)},
"DataLength": &types.AttributeValueMemberN{Value: strconv.FormatUint(uint64(blobSizeCharged), 10)},
},
)

if err != nil {
return fmt.Errorf("failed to add payment: %w", err)
}
return nil
}

// RemoveOnDemandPayment removes a specific payment from the list for a specific account
func (s *OffchainStore) RemoveOnDemandPayment(ctx context.Context, accountID string, payment uint64) error {
err := s.dynamoClient.DeleteItem(ctx, s.onDemandTableName,
commondynamodb.Key{
"AccountID": &types.AttributeValueMemberS{Value: accountID},
"CumulativePayments": &types.AttributeValueMemberN{Value: strconv.FormatUint(payment, 10)},
},
)

if err != nil {
return fmt.Errorf("failed to remove payment: %w", err)
}

return nil
}

// relevant on-demand payment records: previous cumulative payment, next cumulative payment, blob size of next payment
func (s *OffchainStore) GetRelevantOnDemandRecords(ctx context.Context, accountID string, cumulativePayment uint64) (uint64, uint64, uint32, error) {
// Fetch the largest entry smaller than the given cumulativePayment
smallerResult, err := s.dynamoClient.QueryIndexOrderWithLimit(ctx, s.onDemandTableName, "AccountIDIndex",
"AccountID = :account AND CumulativePayments < :cumulativePayment",
commondynamodb.ExpresseionValues{
":account": &types.AttributeValueMemberS{Value: accountID},
":cumulativePayment": &types.AttributeValueMemberN{Value: strconv.FormatUint(cumulativePayment, 10)},
},
false, // Retrieve results in descending order for the largest smaller amount
1,
)
if err != nil {
return 0, 0, 0, fmt.Errorf("failed to query smaller payments for account: %w", err)
}

var prevPayment uint64
if len(smallerResult) > 0 {
prevPayment, err = strconv.ParseUint(smallerResult[0]["CumulativePayments"].(*types.AttributeValueMemberN).Value, 10, 64)
if err != nil {
return 0, 0, 0, fmt.Errorf("failed to parse previous payment: %w", err)
}
}

// Fetch the smallest entry larger than the given cumulativePayment
largerResult, err := s.dynamoClient.QueryIndexOrderWithLimit(ctx, s.onDemandTableName, "AccountIDIndex",
"AccountID = :account AND CumulativePayments > :cumulativePayment",
commondynamodb.ExpresseionValues{
":account": &types.AttributeValueMemberS{Value: accountID},
":cumulativePayment": &types.AttributeValueMemberN{Value: strconv.FormatUint(cumulativePayment, 10)},
},
true, // Retrieve results in ascending order for the smallest greater amount
1,
)
if err != nil {
return 0, 0, 0, fmt.Errorf("failed to query the next payment for account: %w", err)
}
var nextPayment uint64
var nextDataLength uint32
if len(largerResult) > 0 {
nextPayment, err = strconv.ParseUint(largerResult[0]["CumulativePayments"].(*types.AttributeValueMemberN).Value, 10, 64)
if err != nil {
return 0, 0, 0, fmt.Errorf("failed to parse next payment: %w", err)
}
dataLength, err := strconv.ParseUint(largerResult[0]["DataLength"].(*types.AttributeValueMemberN).Value, 10, 32)
if err != nil {
return 0, 0, 0, fmt.Errorf("failed to parse blob size: %w", err)
}
nextDataLength = uint32(dataLength)
}

return prevPayment, nextPayment, nextDataLength, nil
}
Loading

0 comments on commit d12b4a2

Please sign in to comment.