From 3316668393ee9e38025e9b8742144c443d8db3ea Mon Sep 17 00:00:00 2001 From: Cory Watson Date: Thu, 19 Oct 2017 08:20:43 -0700 Subject: [PATCH] Move runtime metrics reporting to a seprate timer routine. --- config.go | 1 + config_parse.go | 11 +++++------ config_test.go | 2 +- example.yaml | 5 +++-- flusher.go | 8 -------- server.go | 40 ++++++++++++++++++++++++++++++++++++---- server_test.go | 5 +++-- 7 files changed, 49 insertions(+), 23 deletions(-) diff --git a/config.go b/config.go index a49f1e713..9ea7bddb9 100644 --- a/config.go +++ b/config.go @@ -28,6 +28,7 @@ type Config struct { OmitEmptyHostname bool `yaml:"omit_empty_hostname"` Percentiles []float64 `yaml:"percentiles"` ReadBufferSizeBytes int `yaml:"read_buffer_size_bytes"` + RuntimeMetricsFlushInterval string `yaml:"runtime_metrics_flush_interval"` SentryDsn string `yaml:"sentry_dsn"` SsfAddress string `yaml:"ssf_address"` SsfBufferSize int `yaml:"ssf_buffer_size"` diff --git a/config_parse.go b/config_parse.go index a257cd2a3..578b256ca 100644 --- a/config_parse.go +++ b/config_parse.go @@ -6,7 +6,6 @@ import ( "io/ioutil" "net/url" "os" - "time" "github.com/kelseyhightower/envconfig" @@ -137,10 +136,10 @@ func readConfig(r io.Reader) (c Config, err error) { } c.SsfListenAddresses = append(c.SsfListenAddresses, ssfAddrs...) - return c, nil -} + if c.RuntimeMetricsFlushInterval == "" { + // Default the runtime metrics to the flush interval if not overriden + c.RuntimeMetricsFlushInterval = c.Interval + } -// ParseInterval handles parsing the flush interval as a time.Duration -func (c Config) ParseInterval() (time.Duration, error) { - return time.ParseDuration(c.Interval) + return c, nil } diff --git a/config_test.go b/config_test.go index fa7928b92..dc3e7b430 100644 --- a/config_test.go +++ b/config_test.go @@ -22,7 +22,7 @@ func TestReadConfig(t *testing.T) { assert.Equal(t, "https://app.datadoghq.com", c.DatadogAPIHostname) assert.Equal(t, 96, c.NumWorkers) - interval, err := c.ParseInterval() + interval, err := time.ParseDuration(c.Interval) assert.NoError(t, err) assert.Equal(t, interval, 10*time.Second) diff --git a/example.yaml b/example.yaml index b4934034b..730fd1327 100644 --- a/example.yaml +++ b/example.yaml @@ -105,7 +105,6 @@ num_workers: 96 num_readers: 1 - # == LIMITS == # How big of a buffer to allocate for incoming metrics. Metrics longer than this @@ -127,7 +126,6 @@ read_buffer_size_bytes: 2097152 # will post multiple times in parallel if the limit is exceeded. flush_max_per_body: 25000 - # == DIAGNOSTICS == # Sets the log level to DEBUG @@ -139,6 +137,9 @@ sentry_dsn: "" # Enables Go profiling enable_profiling: false +# Veneur emits metrics about itself. By default it matches the `interval`, but +# you can tune it to a different interval if desired. +runtime_metrics_flush_interval: "10s" # == SINKS == diff --git a/flusher.go b/flusher.go index 7871e2a67..1b2681e6a 100644 --- a/flusher.go +++ b/flusher.go @@ -10,7 +10,6 @@ import ( "net" "net/http" "net/url" - "runtime" "time" "github.com/DataDog/datadog-go/statsd" @@ -26,13 +25,6 @@ func (s *Server) Flush() { span := tracer.StartSpan("flush").(*trace.Span) defer span.Finish() - mem := &runtime.MemStats{} - runtime.ReadMemStats(mem) - - s.Statsd.Gauge("mem.heap_alloc_bytes", float64(mem.HeapAlloc), nil, 1.0) - s.Statsd.Gauge("gc.number", float64(mem.NumGC), nil, 1.0) - s.Statsd.Gauge("gc.pause_total_ns", float64(mem.PauseTotalNs), nil, 1.0) - // right now we have only one destination plugin // but eventually, this is where we would loop over our supported // destinations diff --git a/server.go b/server.go index 2d2c783d9..c9fbfb24a 100644 --- a/server.go +++ b/server.go @@ -11,6 +11,7 @@ import ( "io" "net" "net/http" + "runtime" "strings" "sync" "syscall" @@ -87,10 +88,11 @@ type Server struct { SSFListenAddrs []net.Addr RcvbufBytes int - interval time.Duration - numReaders int - metricMaxLength int - traceMaxLengthBytes int + interval time.Duration + runtimeMetricInterval time.Duration + numReaders int + metricMaxLength int + traceMaxLengthBytes int tlsConfig *tls.Config tcpReadTimeout time.Duration @@ -146,6 +148,12 @@ func NewFromConfig(conf Config) (ret Server, err error) { if err != nil { return } + + ret.runtimeMetricInterval, err = time.ParseDuration(conf.RuntimeMetricsFlushInterval) + if err != nil { + return + } + ret.HTTPClient = &http.Client{ // make sure that POSTs to datadog do not overflow the flush interval Timeout: ret.interval * 9 / 10, @@ -429,6 +437,30 @@ func (s *Server) Start() { logrus.Info("Tracing sockets are configured - not reading trace socket") } + // Emit runtime metrics about ourselves on a timer. + go func() { + defer func() { + ConsumePanic(s.Sentry, s.Statsd, s.Hostname, recover()) + }() + ticker := time.NewTicker(s.runtimeMetricInterval) + for { + select { + case <-s.shutdown: + // stop flushing on graceful shutdown + ticker.Stop() + return + case <-ticker.C: + mem := &runtime.MemStats{} + runtime.ReadMemStats(mem) + + s.Statsd.Gauge("mem.heap_alloc_bytes", float64(mem.HeapAlloc), nil, 1.0) + s.Statsd.Gauge("gc.number", float64(mem.NumGC), nil, 1.0) + s.Statsd.Gauge("gc.pause_total_ns", float64(mem.PauseTotalNs), nil, 1.0) + s.Statsd.Gauge("goroutines", float64(runtime.NumGoroutine()), nil, 1.0) + } + } + }() + // Flush every Interval forever! go func() { defer func() { diff --git a/server_test.go b/server_test.go index 0cc7d61d6..0d8d92826 100644 --- a/server_test.go +++ b/server_test.go @@ -76,7 +76,8 @@ func generateConfig(forwardAddr string) Config { Hostname: "localhost", // Use a shorter interval for tests - Interval: DefaultFlushInterval.String(), + Interval: DefaultFlushInterval.String(), + RuntimeMetricsFlushInterval: DefaultFlushInterval.String(), Key: "", MetricMaxLength: 4096, Percentiles: []float64{.5, .75, .99}, @@ -189,7 +190,7 @@ type fixture struct { } func newFixture(t *testing.T, config Config) *fixture { - interval, err := config.ParseInterval() + interval, err := time.ParseDuration(config.Interval) assert.NoError(t, err) // Set up a remote server (the API that we're sending the data to)