diff --git a/core/meterer/meterer.go b/core/meterer/meterer.go index 468b33cc37..916207e6cb 100644 --- a/core/meterer/meterer.go +++ b/core/meterer/meterer.go @@ -101,6 +101,9 @@ func (m *Meterer) ServeReservationRequest(ctx context.Context, header core.Payme } // ValidateQuorums ensures that the quorums listed in the blobHeader are present within allowedQuorums +// Note: A reservation that does not utilize all of the allowed quorums will be accepted. However, it +// will still charge against all of the allowed quorums. A on-demand requrests require and only allow +// the ETH and EIGEN quorums. func (m *Meterer) ValidateQuorum(headerQuorums []uint8, allowedQuorums []uint8) error { if len(headerQuorums) == 0 { return fmt.Errorf("no quorum params in blob header") @@ -130,8 +133,8 @@ func (m *Meterer) ValidateBinIndex(header core.PaymentMetadata, reservation *cor // IncrementBinUsage increments the bin usage atomically and checks for overflow // TODO: Bin limit should be direct write to the Store func (m *Meterer) IncrementBinUsage(ctx context.Context, header core.PaymentMetadata, reservation *core.ActiveReservation, blobLength uint) error { - numSymbols := uint64(max(blobLength, uint(m.MinNumSymbols))) - newUsage, err := m.OffchainStore.UpdateReservationBin(ctx, header.AccountID, uint64(header.BinIndex), numSymbols) + numSymbols := m.SymbolsCharged(blobLength) + newUsage, err := m.OffchainStore.UpdateReservationBin(ctx, header.AccountID, uint64(header.BinIndex), uint64(numSymbols)) if err != nil { return fmt.Errorf("failed to increment bin usage: %w", err) } @@ -140,7 +143,7 @@ func (m *Meterer) IncrementBinUsage(ctx context.Context, header core.PaymentMeta usageLimit := m.GetReservationBinLimit(reservation) if newUsage <= usageLimit { return nil - } else if newUsage-numSymbols >= usageLimit { + } else if newUsage-uint64(numSymbols) >= usageLimit { // metered usage before updating the size already exceeded the limit return fmt.Errorf("bin has already been filled") } diff --git a/core/meterer/meterer_test.go b/core/meterer/meterer_test.go index b55969a53b..faeead9ac4 100644 --- a/core/meterer/meterer_test.go +++ b/core/meterer/meterer_test.go @@ -103,9 +103,9 @@ func setup(_ *testing.M) { logger = logging.NewNoopLogger() config := meterer.Config{ - PricePerSymbol: 1, - MinNumSymbols: 1, - GlobalSymbolsPerSecond: 1000, + PricePerSymbol: 2, + MinNumSymbols: 3, + GlobalSymbolsPerSecond: 1009, ReservationWindow: 1, ChainReadTimeout: 3 * time.Second, } @@ -131,8 +131,8 @@ func setup(_ *testing.M) { accountID2 = crypto.PubkeyToAddress(privateKey2.PublicKey).Hex() account1Reservations = core.ActiveReservation{SymbolsPerSec: 100, StartTimestamp: now + 1200, EndTimestamp: now + 1800, QuorumSplit: []byte{50, 50}, QuorumNumbers: []uint8{0, 1}} account2Reservations = core.ActiveReservation{SymbolsPerSec: 200, StartTimestamp: now - 120, EndTimestamp: now + 180, QuorumSplit: []byte{30, 70}, QuorumNumbers: []uint8{0, 1}} - account1OnDemandPayments = core.OnDemandPayment{CumulativePayment: 1500} - account2OnDemandPayments = core.OnDemandPayment{CumulativePayment: 1000} + account1OnDemandPayments = core.OnDemandPayment{CumulativePayment: 3864} + account2OnDemandPayments = core.OnDemandPayment{CumulativePayment: 2000} store, err := meterer.NewOffchainStore( clientConfig, @@ -203,6 +203,7 @@ func TestMetererReservations(t *testing.T) { // test bin usage metering dataLength := uint(20) + requiredLength := uint(21) // 21 should be charged for length of 20 since minNumSymbols is 3 for i := 0; i < 9; i++ { blob, header = createMetererInput(binIndex, 0, dataLength, quoromNumbers, accountID2) err = mt.MeterRequest(ctx, *blob, *header) @@ -214,10 +215,10 @@ func TestMetererReservations(t *testing.T) { assert.NoError(t, err) assert.Equal(t, accountID2, item["AccountID"].(*types.AttributeValueMemberS).Value) assert.Equal(t, strconv.Itoa(int(binIndex)), item["BinIndex"].(*types.AttributeValueMemberN).Value) - assert.Equal(t, strconv.Itoa((i+1)*int(dataLength)), item["BinUsage"].(*types.AttributeValueMemberN).Value) + assert.Equal(t, strconv.Itoa((i+1)*int(requiredLength)), item["BinUsage"].(*types.AttributeValueMemberN).Value) } - // frist over flow is allowed + // first over flow is allowed blob, header = createMetererInput(binIndex, 0, 25, quoromNumbers, accountID2) assert.NoError(t, err) err = mt.MeterRequest(ctx, *blob, *header) @@ -230,7 +231,8 @@ func TestMetererReservations(t *testing.T) { assert.NoError(t, err) assert.Equal(t, accountID2, item["AccountID"].(*types.AttributeValueMemberS).Value) assert.Equal(t, strconv.Itoa(int(overflowedBinIndex)), item["BinIndex"].(*types.AttributeValueMemberN).Value) - assert.Equal(t, strconv.Itoa(int(5)), item["BinUsage"].(*types.AttributeValueMemberN).Value) + // 25 rounded up to the nearest multiple of minNumSymbols - (200-21*9) = 16 + assert.Equal(t, strconv.Itoa(int(16)), item["BinUsage"].(*types.AttributeValueMemberN).Value) // second over flow blob, header = createMetererInput(binIndex, 0, 1, quoromNumbers, accountID2) @@ -238,6 +240,10 @@ func TestMetererReservations(t *testing.T) { err = mt.MeterRequest(ctx, *blob, *header) assert.ErrorContains(t, err, "bin has already been filled") + // overwhelming bin overflow for empty bins (assuming all previous requests happened within the current reservation window) + blob, header = createMetererInput(binIndex-1, 0, 10, quoromNumbers, accountID2) + err = mt.MeterRequest(ctx, *blob, *header) + assert.NoError(t, err) // overwhelming bin overflow for empty bins (assuming all previous requests happened within the current reservation window) blob, header = createMetererInput(binIndex-1, 0, 1000, quoromNumbers, accountID2) err = mt.MeterRequest(ctx, *blob, *header) @@ -265,18 +271,18 @@ func TestMetererOnDemand(t *testing.T) { if err != nil { t.Fatalf("Failed to generate key: %v", err) } - blob, header := createMetererInput(1, 1, 1000, quorumNumbers, crypto.PubkeyToAddress(unregisteredUser.PublicKey).Hex()) + blob, header := createMetererInput(binIndex, 2, 1000, quorumNumbers, crypto.PubkeyToAddress(unregisteredUser.PublicKey).Hex()) assert.NoError(t, err) err = mt.MeterRequest(ctx, *blob, *header) assert.ErrorContains(t, err, "failed to get on-demand payment by account: payment not found") // test invalid quorom ID - blob, header = createMetererInput(1, 1, 1000, []uint8{0, 1, 2}, accountID1) + blob, header = createMetererInput(binIndex, 1, 1000, []uint8{0, 1, 2}, accountID1) err = mt.MeterRequest(ctx, *blob, *header) assert.ErrorContains(t, err, "invalid quorum for On-Demand Request") // test insufficient cumulative payment - blob, header = createMetererInput(0, 1, 2000, quorumNumbers, accountID1) + blob, header = createMetererInput(binIndex, 1, 2000, quorumNumbers, accountID1) err = mt.MeterRequest(ctx, *blob, *header) assert.ErrorContains(t, err, "insufficient cumulative payment increment") // No rollback after meter request @@ -288,32 +294,39 @@ func TestMetererOnDemand(t *testing.T) { assert.Equal(t, 1, len(result)) // test duplicated cumulative payments - blob, header = createMetererInput(binIndex, uint64(100), 100, quorumNumbers, accountID2) + dataLength := uint(100) + priceCharged := mt.PaymentCharged(dataLength) + assert.Equal(t, uint64(102*mt.PricePerSymbol), priceCharged) + blob, header = createMetererInput(binIndex, priceCharged, dataLength, quorumNumbers, accountID2) err = mt.MeterRequest(ctx, *blob, *header) assert.NoError(t, err) - blob, header = createMetererInput(binIndex, uint64(100), 100, quorumNumbers, accountID2) + blob, header = createMetererInput(binIndex, priceCharged, dataLength, quorumNumbers, accountID2) err = mt.MeterRequest(ctx, *blob, *header) assert.ErrorContains(t, err, "exact payment already exists") // test valid payments for i := 1; i < 9; i++ { - blob, header = createMetererInput(binIndex, uint64(100*(i+1)), 100, quorumNumbers, accountID2) + blob, header = createMetererInput(binIndex, uint64(priceCharged)*uint64(i+1), dataLength, quorumNumbers, accountID2) err = mt.MeterRequest(ctx, *blob, *header) assert.NoError(t, err) } // test cumulative payment on-chain constraint - blob, header = createMetererInput(binIndex, 1001, 1, quorumNumbers, accountID2) + blob, header = createMetererInput(binIndex, 2023, 1, quorumNumbers, accountID2) err = mt.MeterRequest(ctx, *blob, *header) assert.ErrorContains(t, err, "invalid on-demand payment: request claims a cumulative payment greater than the on-chain deposit") // test insufficient increment in cumulative payment - blob, header = createMetererInput(binIndex, 901, 2, quorumNumbers, accountID2) + previousCumulativePayment := uint64(priceCharged) * uint64(9) + dataLength = uint(2) + priceCharged = mt.PaymentCharged(dataLength) + blob, header = createMetererInput(binIndex, previousCumulativePayment+priceCharged-1, dataLength, quorumNumbers, accountID2) err = mt.MeterRequest(ctx, *blob, *header) assert.ErrorContains(t, err, "invalid on-demand payment: insufficient cumulative payment increment") + previousCumulativePayment = previousCumulativePayment + priceCharged // test cannot insert cumulative payment in out of order - blob, header = createMetererInput(binIndex, 50, 50, quorumNumbers, accountID2) + blob, header = createMetererInput(binIndex, mt.PaymentCharged(50), 50, quorumNumbers, accountID2) err = mt.MeterRequest(ctx, *blob, *header) assert.ErrorContains(t, err, "invalid on-demand payment: breaking cumulative payment invariants") @@ -324,8 +337,9 @@ func TestMetererOnDemand(t *testing.T) { }}) assert.NoError(t, err) assert.Equal(t, numPrevRecords, len(result)) - // test failed global rate limit - blob, header = createMetererInput(binIndex, 1002, 1001, quorumNumbers, accountID1) + // test failed global rate limit (previously payment recorded: 2, global limit: 1009) + fmt.Println("need ", previousCumulativePayment+mt.PaymentCharged(1010)) + blob, header = createMetererInput(binIndex, previousCumulativePayment+mt.PaymentCharged(1010), 1010, quorumNumbers, accountID1) err = mt.MeterRequest(ctx, *blob, *header) assert.ErrorContains(t, err, "failed global rate limiting") // Correct rollback