Skip to content

Commit

Permalink
Merge pull request #84 from kaleido-io/sendrate-control
Browse files Browse the repository at this point in the history
Adding control for submission rate.
  • Loading branch information
EnriqueL8 authored Jun 17, 2024
2 parents 74b4ae8 + a980fb8 commit 953d836
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 6 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,10 @@ There are various options for creating your own customized tests. A full list of
- All values default to `0` which has the effect of not limiting the rate of the test.
- The test will allow at most `startRate` actions to happen per second. Over the period of `rateRampUpTime` seconds the allowed rate will increase linearly until `endRate` actions per seconds are reached. At this point the test will continue at `endRate` actions per second until the test finishes.
- If `startRate` is the only value that is set, the test will run at that rate for the entire test.
- Waiting for mint transactions to be confirmed before doing the next one
- Waiting for events to be confirmed before doing the next submission
- See `noWaitSubmission` (defaults to `false`).
- When set to `true` each worker routine will perform its action (e.g. minting a token) and wait for confirmation of that event before doing its next action.
- `maxSubmissionsPerSecond` can be used to control the maximum number of submissions per second to avoid overloading the system under test.
- Setting the features of a token being tested
- See `supportsData` and `supportsURI` attributes of a test instance.
- `supportsData` defaults to `true` since the sample token contract used by FireFly supports minting tokens with data. When set to `true` the message included in the mint transaction will include the ID of the worker routine and used to correlate received confirmation events.
Expand Down
1 change: 1 addition & 0 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ func generateRunnerConfigFromInstance(instance *conf.InstanceConfig, perfConfig
runnerConfig.LogLevel = perfConfig.LogLevel
runnerConfig.SkipMintConfirmations = instance.SkipMintConfirmations
runnerConfig.NoWaitSubmission = instance.NoWaitSubmission
runnerConfig.MaxSubmissionsPerSecond = instance.MaxSubmissionsPerSecond
runnerConfig.Length = instance.Length
runnerConfig.Daemon = perfConfig.Daemon
runnerConfig.LogEvents = perfConfig.LogEvents
Expand Down
2 changes: 2 additions & 0 deletions internal/conf/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type RunnerConfig struct {
RampLength time.Duration
SkipMintConfirmations bool // deprecated
NoWaitSubmission bool
MaxSubmissionsPerSecond int
SubscriptionCoreOptions *core.SubscriptionCoreOptions
}

Expand Down Expand Up @@ -87,6 +88,7 @@ type InstanceConfig struct {
RampLength time.Duration `json:"rampLength,omitempty" yaml:"rampLength,omitempty"`
SkipMintConfirmations bool `json:"skipMintConfirmations" yaml:"skipMintConfirmations"` // deprecated
NoWaitSubmission bool `json:"noWaitSubmission" yaml:"noWaitSubmission"`
MaxSubmissionsPerSecond int `json:"maxSubmissionsPerSecond" yaml:"maxSubmissionsPerSecond"`
DelinquentAction string `json:"delinquentAction,omitempty" yaml:"delinquentAction,omitempty"`
PerWorkerSigningKeyPrefix string `json:"perWorkerSigningKeyPrefix,omitempty" yaml:"perWorkerSigningKeyPrefix,omitempty"`
SubscriptionCoreOptions *core.SubscriptionCoreOptions `json:"subscriptionOptions,omitempty" yaml:"subscriptionOptions,omitempty"`
Expand Down
18 changes: 13 additions & 5 deletions internal/perf/perf.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"crypto/tls"
"encoding/json"
"fmt"
"math"
"net/url"
"os"
"os/signal"
Expand Down Expand Up @@ -49,9 +50,7 @@ const workerPrefix = "worker-"
const preparePrefix = "prep-"

var mutex = &sync.Mutex{}
var limiter *rate.Limiter
var TRANSPORT_TYPE = "websockets"
var wsReadAhead = uint16(50)

var METRICS_NAMESPACE = "ffperf"
var METRICS_SUBSYSTEM = "runner"
Expand Down Expand Up @@ -540,10 +539,15 @@ func (pr *perfRunner) Start() (err error) {
i := 0
lastCheckedTime := time.Now()

rateLimiter := rate.NewLimiter(rate.Limit(math.MaxFloat64), math.MaxInt)

if pr.cfg.MaxSubmissionsPerSecond > 0 {
rateLimiter = rate.NewLimiter(rate.Limit(pr.cfg.MaxSubmissionsPerSecond), pr.cfg.MaxSubmissionsPerSecond)
}
log.Infof("Sending rate: %f per second with %d burst", rateLimiter.Limit(), rateLimiter.Burst())
perfLoop:
for pr.daemon || time.Now().Unix() < pr.endTime {
timeout := time.After(60 * time.Second)

// If we've been given a maximum number of actions to perform, check if we're done
if pr.cfg.MaxActions > 0 && int64(getMetricVal(totalActionsCounter)) >= pr.cfg.MaxActions {
break perfLoop
Expand All @@ -553,21 +557,25 @@ perfLoop:
case <-signalCh:
break perfLoop
case pr.bfr <- i:
err = rateLimiter.Wait(pr.ctx)
if err != nil {
log.Panic(fmt.Errorf("rate limiter failed"))
break perfLoop
}
i++
if time.Since(lastCheckedTime).Seconds() > pr.cfg.MaxTimePerAction.Seconds() {
if pr.detectDelinquentMsgs() && pr.cfg.DelinquentAction == conf.DelinquentActionExit.String() {
break perfLoop
}
lastCheckedTime = time.Now()
}
break
case <-timeout:
if pr.detectDelinquentMsgs() && pr.cfg.DelinquentAction == conf.DelinquentActionExit.String() {
break perfLoop
}
lastCheckedTime = time.Now()
break
}

}

// If configured, check that the balance of the mint recipient address is correct
Expand Down

0 comments on commit 953d836

Please sign in to comment.