From 698d5d500282761993ce1c7d16773ac686ca21aa Mon Sep 17 00:00:00 2001 From: Vincent Miszczak Date: Fri, 18 May 2018 17:10:19 +0200 Subject: [PATCH] add rate limiter --- README.md | 5 ++++- dump.go | 67 ++++++++++++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 65 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 2d81022..0197155 100644 --- a/README.md +++ b/README.md @@ -18,10 +18,13 @@ Usage of ./whisper-to-graphite: Hostname/IP of the graphite server (default "127.0.0.1") -port int graphite Port (default 2003) + -pps int + Number of maximum points per second to send (0 means rate limiter is disabled) -protocol string Protocol to use to transfer graphite data (tcp/udp/nop) (default "tcp") -workers int - Workers to run in parallel (default 5) + Workers to run in parallel (default 5) + ``` Assuming you don't want to use this as testcase for your IO subsystem, diff --git a/dump.go b/dump.go index 346e135..33378e1 100644 --- a/dump.go +++ b/dump.go @@ -3,8 +3,6 @@ package main import ( "errors" "flag" - "github.com/bzed/go-whisper" - "github.com/marpaia/graphite-golang" "log" "math" "os" @@ -12,8 +10,58 @@ import ( "strconv" "strings" "sync" + "time" + + "github.com/bzed/go-whisper" + "github.com/marpaia/graphite-golang" ) +type rateLimiter struct { + pointsPerSecond int64 + currentPoints int64 + full chan bool + lock *sync.Mutex + enabled bool +} + +func newRateLimiter(pointsPerSecond int64) *rateLimiter { + rl := new(rateLimiter) + rl.pointsPerSecond = pointsPerSecond + rl.currentPoints = 0 + rl.full = make(chan bool) + rl.lock = new(sync.Mutex) + if pointsPerSecond == 0 { + rl.enabled = false + } else { + rl.enabled = true + go func() { + for { + time.Sleep(1 * time.Second) + select { + case <-rl.full: + default: + } + } + }() + return rl + } + return rl +} + +func (rl *rateLimiter) limit(n int64) { + if !rl.enabled { + return + } + rl.lock.Lock() + defer rl.lock.Unlock() + + rl.currentPoints += n + if rl.currentPoints >= rl.pointsPerSecond { + rl.full <- true + rl.currentPoints = 0 + } +} + func convertFilename(filename string, baseDirectory string) (string, error) { absFilename, err := filepath.Abs(filename) if err != nil { @@ -48,6 +96,7 @@ func sendWhisperData( graphiteConn *graphite.Graphite, fromTs int, toTs int, + rateLimiter *rateLimiter, ) error { metricName, err := convertFilename(filename, baseDirectory) if err != nil { @@ -75,6 +124,7 @@ func sendWhisperData( metrics = append(metrics, graphite.NewMetric(metricName, v, int64(interval))) } + rateLimiter.limit(int64(len(metrics))) err = graphiteConn.SendMetrics(metrics) if err != nil { return err @@ -107,8 +157,8 @@ func worker(ch chan string, graphitePort int, graphiteProtocol string, fromTs int, - toTs int) { - + toTs int, + rateLimiter *rateLimiter) { defer wg.Done() graphiteConn, err := graphite.GraphiteFactory(graphiteProtocol, graphiteHost, graphitePort, "") @@ -121,7 +171,7 @@ func worker(ch chan string, case path := <-ch: { - err := sendWhisperData(path, baseDirectory, graphiteConn, fromTs, toTs) + err := sendWhisperData(path, baseDirectory, graphiteConn, fromTs, toTs, rateLimiter) if err != nil { log.Println("Failed: " + path) log.Println(err) @@ -170,6 +220,10 @@ func main() { "to", 2147483647, "Ending timestamp to dump data up to") + pointsPerSecond := flag.Int64( + "pps", + 0, + "Number of maximum points per second to send (0 means rate limiter is disabled)") flag.Parse() if !(*graphiteProtocol == "tcp" || @@ -181,9 +235,10 @@ func main() { quit := make(chan int) var wg sync.WaitGroup + rl := newRateLimiter(*pointsPerSecond) wg.Add(*workers) for i := 0; i < *workers; i++ { - go worker(ch, quit, &wg, *baseDirectory, *graphiteHost, *graphitePort, *graphiteProtocol, *fromTs, *toTs) + go worker(ch, quit, &wg, *baseDirectory, *graphiteHost, *graphitePort, *graphiteProtocol, *fromTs, *toTs, rl) } go findWhisperFiles(ch, quit, *directory) wg.Wait()