Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Feature/atomic chaining #159

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 61 additions & 18 deletions algorithms.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,32 +178,45 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
// If we are already at the limit.
if rl.Remaining == 0 && r.Hits > 0 {
span.AddEvent("Already over the limit")
overLimitCounter.Add(1)
rl.Status = Status_OVER_LIMIT
t.Status = rl.Status
if !r.AtomicCheck {
overLimitCounter.Add(1)
}
return rl, nil

}

// If requested hits takes the remainder.
if t.Remaining == r.Hits {
span.AddEvent("At the limit")
t.Remaining = 0
rl.Remaining = 0
if !r.AtomicCheck {
t.Remaining = 0
}
return rl, nil
}

// If requested is more than available, then return over the limit
// without updating the cache.
if r.Hits > t.Remaining {
span.AddEvent("Over the limit")
overLimitCounter.Add(1)
rl.Status = Status_OVER_LIMIT
if !r.AtomicCheck {
overLimitCounter.Add(1)
}
return rl, nil
}

span.AddEvent("Under the limit")
t.Remaining -= r.Hits
rl.Remaining = t.Remaining
// if we are doing a check during atomic chaining then we only want
// to update the response and not the actual value for the rate limit
if r.AtomicCheck {
rl.Remaining = t.Remaining - r.Hits
} else {
t.Remaining -= r.Hits
rl.Remaining = t.Remaining
}
return rl, nil
}

Expand All @@ -228,6 +241,7 @@ func tokenBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq)
Remaining: r.Limit - r.Hits,
CreatedAt: now,
}

item := &CacheItem{
Algorithm: Algorithm_TOKEN_BUCKET,
Key: r.HashKey(),
Expand All @@ -251,13 +265,22 @@ func tokenBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq)
ResetTime: expire,
}

// if we are doing a check during atomic chaining then remaining number is actually the limit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to see a separation of concerns around checking a bucket vs. updating the bucket. Because now there's two places where the bucket's counters are updated as a shared responsibility. Can we have the update logic pulled out and called by gubernator's behavior logic and not by tokenBucket/leakyBucket?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah that's a good point, I will move the logic to persist hits into a new function

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update: I've decoupled the update logic from the actual algorithms into a new function which is invoked from peer_client.go following the actual "check" using the algorithm in the case the request is not part of an atomic chain. hopefully this is the kind of separation you were thinking of. I have also extracted the cache and store loading logic into a common function.

// but we should respond with what the value would be as if it had been applied
if r.AtomicCheck {
t.Remaining = r.Limit
rl.Remaining = r.Limit - r.Hits
}

// Client could be requesting that we always return OVER_LIMIT.

if r.Hits > r.Limit {
span.AddEvent("Over the limit")
overLimitCounter.Add(1)
if !r.AtomicCheck {
overLimitCounter.Add(1)
t.Remaining = r.Limit
}
rl.Status = Status_OVER_LIMIT
rl.Remaining = r.Limit
t.Remaining = r.Limit
}

c.Add(item)
Expand Down Expand Up @@ -380,7 +403,7 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
duration = expire - (n.UnixNano() / 1000000)
}

if r.Hits != 0 {
if r.Hits != 0 && !r.AtomicCheck {
c.UpdateExpiration(r.HashKey(), now+duration)
}

Expand Down Expand Up @@ -415,23 +438,29 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *

// If we are already at the limit
if int64(b.Remaining) == 0 && r.Hits > 0 {
overLimitCounter.Add(1)
if !r.AtomicCheck {
overLimitCounter.Add(1)
}
rl.Status = Status_OVER_LIMIT
return rl, nil
}

// If requested hits takes the remainder
if int64(b.Remaining) == r.Hits {
b.Remaining -= float64(r.Hits)
if !r.AtomicCheck {
b.Remaining -= float64(r.Hits)
rl.ResetTime = now + (rl.Limit-int64(b.Remaining))*int64(rate)
}
rl.Remaining = 0
rl.ResetTime = now + (rl.Limit-rl.Remaining)*int64(rate)
return rl, nil
}

// If requested is more than available, then return over the limit
// without updating the bucket.
if r.Hits > int64(b.Remaining) {
overLimitCounter.Add(1)
if !r.AtomicCheck {
overLimitCounter.Add(1)
}
rl.Status = Status_OVER_LIMIT
return rl, nil
}
Expand All @@ -441,8 +470,14 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
return rl, nil
}

b.Remaining -= float64(r.Hits)
rl.Remaining = int64(b.Remaining)
// if we are doing a check during atomic chaining then we only want
// to update the response and not the actual value for the rate limit
if r.AtomicCheck {
rl.Remaining = int64(b.Remaining - float64(r.Hits))
} else {
b.Remaining -= float64(r.Hits)
rl.Remaining = int64(b.Remaining)
}
rl.ResetTime = now + (rl.Limit-rl.Remaining)*int64(rate)
return rl, nil
}
Expand Down Expand Up @@ -488,13 +523,21 @@ func leakyBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq)
ResetTime: now + (b.Limit-(r.Burst-r.Hits))*int64(rate),
}

// if we are doing a check during atomic chaining then remaining number is actually the burst limit
// but we should respond with what the value would be as if it had been applied
if r.AtomicCheck {
b.Remaining = float64(r.Burst)
rl.Remaining = r.Burst - r.Hits
}

// Client could be requesting that we start with the bucket OVER_LIMIT
if r.Hits > r.Burst {
overLimitCounter.Add(1)
if !r.AtomicCheck {
overLimitCounter.Add(1)
b.Remaining = 0
}
rl.Status = Status_OVER_LIMIT
rl.Remaining = 0
rl.ResetTime = now + (rl.Limit-rl.Remaining)*int64(rate)
b.Remaining = 0
}

item := &CacheItem{
Expand Down
174 changes: 173 additions & 1 deletion functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1218,7 +1218,179 @@ func TestGetPeerRateLimits(t *testing.T) {
})
}

// TODO: Add a test for sending no rate limits RateLimitReqList.RateLimits = nil
// Test multiple rate limits with atomic chaining enabled:
// should behave as normal if a rate limit is not exceeded
// should fail without incrementing rate limits if
func TestAtomicChainingMultipleLimits(t *testing.T) {
// If the consistent hash changes or the number of peers changes, this might
// need to be changed. We want the test to forward both rate limits to other
// nodes in the cluster.

t.Logf("Asking Peer: %s", cluster.GetPeers()[0].GRPCAddress)
client, errs := guber.DialV1Server(cluster.GetPeers()[0].GRPCAddress, nil)
require.Nil(t, errs)

// send 2 different requests with atomic chaining on configured, second one starts over limit
resp, err := client.GetRateLimits(context.Background(), &guber.GetRateLimitsReq{
Requests: []*guber.RateLimitReq{
{
Name: "test_atomic_chaining",
UniqueKey: "account:9234",
Algorithm: guber.Algorithm_TOKEN_BUCKET,
Duration: guber.Second * 9,
Limit: 2,
Hits: 1,
Behavior: 0,
},
{
Name: "test_atomic_chaining",
UniqueKey: "account:5678",
Algorithm: guber.Algorithm_LEAKY_BUCKET,
Duration: guber.Second * 9,
Limit: 10,
Hits: 11,
Behavior: 0,
},
},
Behavior: guber.UnionBehavior_ATOMIC_REQUESTS,
})
require.Nil(t, err)

require.Len(t, resp.Responses, 2)

rl := resp.Responses[0]
assert.Equal(t, guber.Status_UNDER_LIMIT, rl.Status)
assert.Equal(t, int64(1), rl.Remaining)
assert.Equal(t, int64(2), rl.Limit)

rl = resp.Responses[1]
assert.Equal(t, guber.Status_OVER_LIMIT, rl.Status)
assert.Equal(t, int64(10), rl.Limit)

// fire requests again but with a lower limit and all should increment their respective limits
resp, err = client.GetRateLimits(context.Background(), &guber.GetRateLimitsReq{
Requests: []*guber.RateLimitReq{
{
Name: "test_atomic_chaining",
UniqueKey: "account:9234",
Algorithm: guber.Algorithm_TOKEN_BUCKET,
Duration: guber.Second * 9,
Limit: 2,
Hits: 1,
Behavior: 0,
},
{
Name: "test_atomic_chaining",
UniqueKey: "account:5678",
Algorithm: guber.Algorithm_LEAKY_BUCKET,
Duration: guber.Second * 9,
Limit: 10,
Hits: 5,
Behavior: 0,
},
},
Behavior: guber.UnionBehavior_ATOMIC_REQUESTS,
})
require.Nil(t, err)

require.Len(t, resp.Responses, 2)

rl = resp.Responses[0]
assert.Equal(t, guber.Status_UNDER_LIMIT, rl.Status)
assert.Equal(t, int64(1), rl.Remaining)
assert.Equal(t, int64(2), rl.Limit)

rl = resp.Responses[1]
assert.Equal(t, guber.Status_UNDER_LIMIT, rl.Status)
assert.Equal(t, int64(5), rl.Remaining)
assert.Equal(t, int64(10), rl.Limit)

// Exceed the first rate limit, second should not be impacted
resp, err = client.GetRateLimits(context.Background(), &guber.GetRateLimitsReq{
Requests: []*guber.RateLimitReq{
{
Name: "test_atomic_chaining",
UniqueKey: "account:9234",
Algorithm: guber.Algorithm_TOKEN_BUCKET,
Duration: guber.Second * 9,
Limit: 2,
Hits: 2,
Behavior: 0,
},
{
Name: "test_atomic_chaining",
UniqueKey: "account:5678",
Algorithm: guber.Algorithm_LEAKY_BUCKET,
Duration: guber.Second * 9,
Limit: 10,
Hits: 1,
Behavior: 0,
},
},
Behavior: guber.UnionBehavior_ATOMIC_REQUESTS,
})
require.Nil(t, err)

require.Len(t, resp.Responses, 2)

rl = resp.Responses[0]
assert.Equal(t, guber.Status_OVER_LIMIT, rl.Status)
assert.Equal(t, int64(2), rl.Limit)

rl = resp.Responses[1]
assert.Equal(t, guber.Status_UNDER_LIMIT, rl.Status)
assert.Equal(t, int64(4), rl.Remaining)
assert.Equal(t, int64(10), rl.Limit)

// Send another atomic pair, the second won't have been modified by the previous request as one of the limits was exceeded.
resp, err = client.GetRateLimits(context.Background(), &guber.GetRateLimitsReq{
Requests: []*guber.RateLimitReq{
{
Name: "test_atomic_chaining",
UniqueKey: "account:9234",
Algorithm: guber.Algorithm_TOKEN_BUCKET,
Duration: guber.Second * 9,
Limit: 2,
Hits: 1,
Behavior: 0,
},
{
Name: "test_atomic_chaining",
UniqueKey: "account:5678",
Algorithm: guber.Algorithm_LEAKY_BUCKET,
Duration: guber.Second * 9,
Limit: 10,
Hits: 1,
Behavior: 0,
},
},
Behavior: guber.UnionBehavior_ATOMIC_REQUESTS,
})
require.Nil(t, err)

require.Len(t, resp.Responses, 2)

rl = resp.Responses[0]
assert.Equal(t, guber.Status_UNDER_LIMIT, rl.Status)
assert.Equal(t, int64(0), rl.Remaining)
assert.Equal(t, int64(2), rl.Limit)

rl = resp.Responses[1]
assert.Equal(t, guber.Status_UNDER_LIMIT, rl.Status)
assert.Equal(t, int64(4), rl.Remaining)
assert.Equal(t, int64(10), rl.Limit)
}

func TestRequestNoRateLimits(t *testing.T) {
// test for sending no rate limits RateLimitReqList.RateLimits = nil
client, errs := guber.DialV1Server(cluster.GetPeers()[0].GRPCAddress, nil)
require.Nil(t, errs)
resp, err := client.GetRateLimits(context.Background(), &guber.GetRateLimitsReq{
Requests: []*guber.RateLimitReq{},
})
require.Nil(t, err)
assert.Equal(t, 0, len(resp.Responses))
}

func getMetric(t testutil.TestingT, in io.Reader, name string) *model.Sample {
dec := expfmt.SampleDecoder{
Expand Down
2 changes: 1 addition & 1 deletion global.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ func (gm *globalManager) sendHits(ctx context.Context, hits map[string]*RateLimi
peerRequests := make(map[string]*pair)
start := clock.Now()

// Assign each request to a peer
for _, r := range hits {
peer, err := gm.instance.GetPeer(ctx, r.HashKey())
if err != nil {
Expand Down Expand Up @@ -214,6 +213,7 @@ func (gm *globalManager) broadcastPeers(ctx context.Context, updates map[string]
SetBehavior(&rl.Behavior, Behavior_GLOBAL, false)
rl.Hits = 0

// we can invoke with atomicCheck = false as we are just getting status by using hits = 0
status, err := gm.instance.getRateLimit(ctx, rl)
if err != nil {
gm.log.WithError(err).Errorf("while broadcasting update to peers for: '%s'", rl.HashKey())
Expand Down
Loading