Skip to content

Commit

Permalink
switch to use rate limiter
Browse files Browse the repository at this point in the history
Signed-off-by: Chengxuan Xing <[email protected]>
  • Loading branch information
Chengxuan committed Jun 13, 2024
1 parent 7976554 commit ad739e6
Showing 1 changed file with 15 additions and 38 deletions.
53 changes: 15 additions & 38 deletions internal/perf/perf.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"crypto/tls"
"encoding/json"
"fmt"
"math"
"net/url"
"os"
"os/signal"
Expand Down Expand Up @@ -548,53 +549,29 @@ perfLoop:
if pr.cfg.MaxActions > 0 && int64(getMetricVal(totalActionsCounter)) >= pr.cfg.MaxActions {
break perfLoop
}
rateLimiter := rate.NewLimiter(rate.Limit(math.MaxFloat64), math.MaxInt)

if pr.cfg.MaxSubmissionsPerSecond > 0 {
// control send rate
secondTicker := time.NewTicker(1 * time.Second)
select {
case <-signalCh:
break perfLoop
case <-secondTicker.C:
for j := 0; j < pr.cfg.MaxSubmissionsPerSecond; j++ {
pr.bfr <- j
}
i++
if time.Since(lastCheckedTime).Seconds() > pr.cfg.MaxTimePerAction.Seconds() {
if pr.detectDelinquentMsgs() && pr.cfg.DelinquentAction == conf.DelinquentActionExit.String() {
break perfLoop
}
lastCheckedTime = time.Now()
}
break
case <-timeout:
rateLimiter = rate.NewLimiter(rate.Limit(pr.cfg.MaxSubmissionsPerSecond), pr.cfg.MaxSubmissionsPerSecond)
}

select {
case <-signalCh:
break perfLoop
case pr.bfr <- i:
rateLimiter.Wait(pr.ctx)
i++
if time.Since(lastCheckedTime).Seconds() > pr.cfg.MaxTimePerAction.Seconds() {
if pr.detectDelinquentMsgs() && pr.cfg.DelinquentAction == conf.DelinquentActionExit.String() {
break perfLoop
}
lastCheckedTime = time.Now()
break
}

} else {
select {
case <-signalCh:
case <-timeout:
if pr.detectDelinquentMsgs() && pr.cfg.DelinquentAction == conf.DelinquentActionExit.String() {
break perfLoop
case pr.bfr <- i:
i++
if time.Since(lastCheckedTime).Seconds() > pr.cfg.MaxTimePerAction.Seconds() {
if pr.detectDelinquentMsgs() && pr.cfg.DelinquentAction == conf.DelinquentActionExit.String() {
break perfLoop
}
lastCheckedTime = time.Now()
}
break
case <-timeout:
if pr.detectDelinquentMsgs() && pr.cfg.DelinquentAction == conf.DelinquentActionExit.String() {
break perfLoop
}
lastCheckedTime = time.Now()
break
}
lastCheckedTime = time.Now()
}

}
Expand Down

0 comments on commit ad739e6

Please sign in to comment.