Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Performance improvements in GetRateLimits() #93

Merged
merged 1 commit into from
Aug 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,15 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [2.0.0-rc.5] - 2021-06-03
## Changes
* Implemented performance recommendations reported in Issue #74

## [2.0.0-rc.4] - 2021-06-03
## Changes
* Add support for burst in leaky bucket #103
* Add working example of aws ecs service discovery deployment #102

## [2.0.0-rc.2] - 2021-07-11
## Changes
* Deprecated github.com/golang/protobuf was replaced with google.golang.org/protobuf
Expand Down
31 changes: 29 additions & 2 deletions benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,33 @@ func BenchmarkServer_GetRateLimit(b *testing.B) {
})
}

func BenchmarkServer_GetRateLimitGlobal(b *testing.B) {
client, err := guber.DialV1Server(cluster.GetRandomPeer(cluster.DataCenterNone).GRPCAddress, nil)
if err != nil {
b.Errorf("NewV1Client err: %s", err)
}

b.Run("GetRateLimitGlobal", func(b *testing.B) {
for n := 0; n < b.N; n++ {
_, err := client.GetRateLimits(context.Background(), &guber.GetRateLimitsReq{
Requests: []*guber.RateLimitReq{
{
Name: "get_rate_limit_benchmark",
UniqueKey: guber.RandomString(10),
Behavior: guber.Behavior_GLOBAL,
Limit: 10,
Duration: guber.Second * 5,
Hits: 1,
},
},
})
if err != nil {
b.Errorf("client.RateLimit() err: %s", err)
}
}
})
}

func BenchmarkServer_Ping(b *testing.B) {
client, err := guber.DialV1Server(cluster.GetRandomPeer(cluster.DataCenterNone).GRPCAddress, nil)
if err != nil {
Expand Down Expand Up @@ -107,13 +134,13 @@ func BenchmarkServer_Ping(b *testing.B) {
}
}*/

func BenchmarkServer_ThunderingHeard(b *testing.B) {
func BenchmarkServer_ThunderingHerd(b *testing.B) {
client, err := guber.DialV1Server(cluster.GetRandomPeer(cluster.DataCenterNone).GRPCAddress, nil)
if err != nil {
b.Errorf("NewV1Client err: %s", err)
}

b.Run("ThunderingHeard", func(b *testing.B) {
b.Run("ThunderingHerd", func(b *testing.B) {
fan := syncutil.NewFanOut(100)
for n := 0; n < b.N; n++ {
fan.Run(func(o interface{}) error {
Expand Down
217 changes: 127 additions & 90 deletions gubernator.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,116 +114,153 @@ func (s *V1Instance) Close() error {
// rate limit `Name` and `UniqueKey` is not owned by this instance then we forward the request to the
// peer that does.
func (s *V1Instance) GetRateLimits(ctx context.Context, r *GetRateLimitsReq) (*GetRateLimitsResp, error) {
var resp GetRateLimitsResp
if len(r.Requests) > maxBatchSize {
return nil, status.Errorf(codes.OutOfRange,
"Requests.RateLimits list too large; max size is '%d'", maxBatchSize)
}

type InOut struct {
In *RateLimitReq
Idx int
Out *RateLimitResp
resp := GetRateLimitsResp{
Responses: make([]*RateLimitResp, len(r.Requests)),
}

// Asynchronously fetch rate limits
out := make(chan InOut)
go func() {
fan := syncutil.NewFanOut(1000)
// For each item in the request body
for i, item := range r.Requests {
fan.Run(func(data interface{}) error {
inOut := data.(InOut)

globalKey := inOut.In.Name + "_" + inOut.In.UniqueKey
var peer *PeerClient
var err error

if len(inOut.In.UniqueKey) == 0 {
inOut.Out = &RateLimitResp{Error: "field 'unique_key' cannot be empty"}
out <- inOut
return nil
}
var wg sync.WaitGroup
asyncCh := make(chan AsyncResp, len(r.Requests))

if len(inOut.In.Name) == 0 {
inOut.Out = &RateLimitResp{Error: "field 'namespace' cannot be empty"}
out <- inOut
return nil
}
// For each item in the request body
for i, req := range r.Requests {
key := req.Name + "_" + req.UniqueKey
var peer *PeerClient
var err error

var attempts int
getPeer:
if attempts > 5 {
inOut.Out = &RateLimitResp{
Error: fmt.Sprintf("GetPeer() keeps returning peers that are not connected for '%s' - '%s'", globalKey, err),
}
out <- inOut
return nil
}
if len(req.UniqueKey) == 0 {
resp.Responses[i] = &RateLimitResp{Error: "field 'unique_key' cannot be empty"}
continue
}

if len(req.Name) == 0 {
resp.Responses[i] = &RateLimitResp{Error: "field 'namespace' cannot be empty"}
continue
}

peer, err = s.GetPeer(key)
if err != nil {
resp.Responses[i] = &RateLimitResp{
Error: fmt.Sprintf("while finding peer that owns rate limit '%s' - '%s'", key, err),
}
continue
}

peer, err = s.GetPeer(globalKey)
// If our server instance is the owner of this rate limit
if peer.Info().IsOwner {
// Apply our rate limit algorithm to the request
resp.Responses[i], err = s.getRateLimit(req)
if err != nil {
resp.Responses[i] = &RateLimitResp{
Error: fmt.Sprintf("while applying rate limit for '%s' - '%s'", key, err),
}
}
} else {
if HasBehavior(req.Behavior, Behavior_GLOBAL) {
resp.Responses[i], err = s.getGlobalRateLimit(req)
if err != nil {
inOut.Out = &RateLimitResp{
Error: fmt.Sprintf("while finding peer that owns rate limit '%s' - '%s'", globalKey, err),
}
out <- inOut
return nil
resp.Responses[i] = &RateLimitResp{Error: err.Error()}
}

// If our server instance is the owner of this rate limit
if peer.Info().IsOwner {
// Apply our rate limit algorithm to the request
inOut.Out, err = s.getRateLimit(inOut.In)
if err != nil {
inOut.Out = &RateLimitResp{
Error: fmt.Sprintf("while applying rate limit for '%s' - '%s'", globalKey, err),
}
}
} else {
if HasBehavior(inOut.In.Behavior, Behavior_GLOBAL) {
inOut.Out, err = s.getGlobalRateLimit(inOut.In)
if err != nil {
inOut.Out = &RateLimitResp{Error: err.Error()}
}

// Inform the client of the owner key of the key
inOut.Out.Metadata = map[string]string{"owner": peer.Info().GRPCAddress}

out <- inOut
return nil
}
// Inform the client of the owner key of the key
resp.Responses[i].Metadata = map[string]string{"owner": peer.Info().GRPCAddress}
continue
}

// Make an RPC call to the peer that owns this rate limit
inOut.Out, err = peer.GetPeerRateLimit(ctx, inOut.In)
if err != nil {
if IsNotReady(err) {
attempts++
goto getPeer
}
inOut.Out = &RateLimitResp{
Error: fmt.Sprintf("while fetching rate limit '%s' from peer - '%s'", globalKey, err),
}
}
wg.Add(1)
go s.asyncRequests(ctx, &AsyncReq{
AsyncCh: asyncCh,
Peer: peer,
Req: req,
WG: &wg,
Key: key,
})
}
}

// Inform the client of the owner key of the key
inOut.Out.Metadata = map[string]string{"owner": peer.Info().GRPCAddress}
}
// Wait for any async responses if any
wg.Wait()
close(asyncCh)
for a := range asyncCh {
resp.Responses[a.Idx] = a.Resp
}

return &resp, nil
}

type AsyncResp struct {
Idx int
Resp *RateLimitResp
}

type AsyncReq struct {
WG *sync.WaitGroup
AsyncCh chan AsyncResp
Req *RateLimitReq
Peer *PeerClient
Key string
Idx int
}

func (s *V1Instance) asyncRequests(ctx context.Context, req *AsyncReq) {
var attempts int
var err error

resp := AsyncResp{
Idx: req.Idx,
}

out <- inOut
return nil
}, InOut{In: item, Idx: i})
for {
if attempts > 5 {
resp.Resp = &RateLimitResp{
Error: fmt.Sprintf("GetPeer() keeps returning peers that are not connected for '%s' - '%s'", req.Key, err),
}
break
}
fan.Wait()
close(out)
}()

resp.Responses = make([]*RateLimitResp, len(r.Requests))
// Collect the async responses as they return
for i := range out {
resp.Responses[i.Idx] = i.Out
// If we are attempting again, the owner of the this rate limit might have changed to us!
if attempts != 0 {
if req.Peer.Info().IsOwner {
resp.Resp, err = s.getRateLimit(req.Req)
if err != nil {
resp.Resp = &RateLimitResp{
Error: fmt.Sprintf("while applying rate limit for '%s' - '%s'", req.Key, err),
}
}
break
}
}

// Make an RPC call to the peer that owns this rate limit
r, err := req.Peer.GetPeerRateLimit(ctx, req.Req)
if err != nil {
if IsNotReady(err) {
attempts++
req.Peer, err = s.GetPeer(req.Key)
if err != nil {
resp.Resp = &RateLimitResp{
Error: fmt.Sprintf("while finding peer that owns rate limit '%s' - '%s'", req.Key, err),
}
break
}
continue
}
resp.Resp = &RateLimitResp{
Error: fmt.Sprintf("while fetching rate limit '%s' from peer - '%s'", req.Key, err),
}
}
// Inform the client of the owner key of the key
resp.Resp = r
resp.Resp.Metadata = map[string]string{"owner": req.Peer.Info().GRPCAddress}
break
}

return &resp, nil
req.AsyncCh <- resp
req.WG.Done()
}

// getGlobalRateLimit handles rate limits that are marked as `Behavior = GLOBAL`. Rate limit responses
Expand Down