From d9db44e216167a56b48d7000c06d4054c9256bee Mon Sep 17 00:00:00 2001 From: Cory Watson Date: Mon, 16 Oct 2017 11:00:24 -0700 Subject: [PATCH 1/4] Combine global/local logic into one unified flush and parallelize sinks. --- flusher.go | 112 +++++++++++++++++------------------------------- flusher_test.go | 5 ++- metric_sink.go | 2 + plugin_test.go | 4 +- server.go | 3 +- server_test.go | 9 ++-- 6 files changed, 54 insertions(+), 81 deletions(-) diff --git a/flusher.go b/flusher.go index 7871e2a67..f80069a0a 100644 --- a/flusher.go +++ b/flusher.go @@ -11,6 +11,7 @@ import ( "net/http" "net/url" "runtime" + "sync" "time" "github.com/DataDog/datadog-go/statsd" @@ -19,13 +20,12 @@ import ( "github.com/stripe/veneur/trace" ) -const DatadogResourceKey = "resource" - // Flush collects sampler's metrics and passes them to sinks. -func (s *Server) Flush() { +func (s *Server) Flush(ctx context.Context) { span := tracer.StartSpan("flush").(*trace.Span) defer span.Finish() + // TODO Move this to an independent ticker routine or something? mem := &runtime.MemStats{} runtime.ReadMemStats(mem) @@ -33,93 +33,57 @@ func (s *Server) Flush() { s.Statsd.Gauge("gc.number", float64(mem.NumGC), nil, 1.0) s.Statsd.Gauge("gc.pause_total_ns", float64(mem.PauseTotalNs), nil, 1.0) - // right now we have only one destination plugin - // but eventually, this is where we would loop over our supported - // destinations - if s.IsLocal() { - s.FlushLocal(span.Attach(context.Background())) - } else { - s.FlushGlobal(span.Attach(context.Background())) - } -} - -// FlushGlobal sends any global metrics to their destination. -func (s *Server) FlushGlobal(ctx context.Context) { - span, _ := trace.StartSpanFromContext(ctx, "") - defer span.Finish() - + // TODO Why is this not in the worker the way the trace worker is set up? events, checks := s.EventWorker.Flush() s.Statsd.Count("worker.events_flushed_total", int64(len(events)), nil, 1.0) s.Statsd.Count("worker.checks_flushed_total", int64(len(checks)), nil, 1.0) - go s.metricSinks[0].FlushEventsChecks(span.Attach(ctx), events, checks) // we can do all of this separately - go s.flushTraces(span.Attach(ctx)) // this too! - - percentiles := s.HistogramPercentiles - - tempMetrics, ms := s.tallyMetrics(percentiles) - - // the global veneur instance is also responsible for reporting the sets - // and global counters - ms.totalLength += ms.totalSets - ms.totalLength += ms.totalGlobalCounters - - finalMetrics := s.generateInterMetrics(span.Attach(ctx), percentiles, tempMetrics, ms) - - s.reportMetricsFlushCounts(ms) - - s.reportGlobalMetricsFlushCounts(ms) - - go func() { - for _, p := range s.getPlugins() { - start := time.Now() - err := p.Flush(span.Attach(ctx), finalMetrics) - s.Statsd.TimeInMilliseconds(fmt.Sprintf("flush.plugins.%s.total_duration_ns", p.Name()), float64(time.Since(start).Nanoseconds()), []string{"part:post"}, 1.0) - if err != nil { - countName := fmt.Sprintf("flush.plugins.%s.error_total", p.Name()) - s.Statsd.Count(countName, 1, []string{}, 1.0) - } - s.Statsd.Gauge(fmt.Sprintf("flush.plugins.%s.post_metrics_total", p.Name()), float64(len(finalMetrics)), nil, 1.0) - } - }() - - // TODO Don't hardcode this - s.metricSinks[0].Flush(span.Attach(ctx), finalMetrics) -} - -// FlushLocal takes the slices of metrics, combines then and marshals them to json -// for posting to Datadog. -func (s *Server) FlushLocal(ctx context.Context) { - span, _ := trace.StartSpanFromContext(ctx, "") - defer span.Finish() - - events, checks := s.EventWorker.Flush() - s.Statsd.Count("worker.checks_flushed_total", int64(len(checks)), nil, 1.0) + // TODO Concurrency + for _, sink := range s.metricSinks { + sink.FlushEventsChecks(span.Attach(ctx), events, checks) + } - go s.metricSinks[0].FlushEventsChecks(span.Attach(ctx), events, checks) // we can do all of this separately - go s.flushTraces(span.Attach(ctx)) + go s.flushTraces(span.Attach(ctx)) // this too! // don't publish percentiles if we're a local veneur; that's the global // veneur's job var percentiles []float64 + var finalMetrics []samplers.InterMetric + + if !s.IsLocal() { + percentiles = s.HistogramPercentiles + } tempMetrics, ms := s.tallyMetrics(percentiles) - finalMetrics := s.generateInterMetrics(span.Attach(ctx), percentiles, tempMetrics, ms) + finalMetrics = s.generateInterMetrics(span.Attach(ctx), percentiles, tempMetrics, ms) s.reportMetricsFlushCounts(ms) - // we don't report totalHistograms, totalSets, or totalTimers for local veneur instances - - // we cannot do this until we're done using tempMetrics within this function, - // since not everything in tempMetrics is safe for sharing - go s.flushForward(span.Attach(ctx), tempMetrics) + if s.IsLocal() { + go s.flushForward(span.Attach(ctx), tempMetrics) + } else { + s.reportGlobalMetricsFlushCounts(ms) + } // If there's nothing to flush, don't bother calling the plugins and stuff. if len(finalMetrics) == 0 { return } + wg := sync.WaitGroup{} + for _, sink := range s.metricSinks { + wg.Add(1) + go func(ms metricSink) { + err := ms.Flush(span.Attach(ctx), finalMetrics) + if err != nil { + log.WithError(err).WithField("sink", ms.Name()).Warn("Error flushin sink") + } + wg.Done() + }(sink) + } + wg.Wait() + go func() { for _, p := range s.getPlugins() { start := time.Now() @@ -132,9 +96,6 @@ func (s *Server) FlushLocal(ctx context.Context) { s.Statsd.Gauge(fmt.Sprintf("flush.plugins.%s.post_metrics_total", p.Name()), float64(len(finalMetrics)), nil, 1.0) } }() - - // TODO Don't harcode this - s.metricSinks[0].Flush(span.Attach(ctx), finalMetrics) } type metricsSummary struct { @@ -195,6 +156,13 @@ func (s *Server) tallyMetrics(percentiles []float64) ([]WorkerMetrics, metricsSu // 'local-only' histograms. ms.totalLocalSets + (ms.totalLocalTimers+ms.totalLocalHistograms)*(s.HistogramAggregates.Count+len(s.HistogramPercentiles)) + // Global instances also flush sets and global counters, so be sure and add + // them to the total size + if !s.IsLocal() { + ms.totalLength += ms.totalSets + ms.totalLength += ms.totalGlobalCounters + } + return tempMetrics, ms } diff --git a/flusher_test.go b/flusher_test.go index 0f63f0b22..05462c057 100644 --- a/flusher_test.go +++ b/flusher_test.go @@ -1,6 +1,7 @@ package veneur import ( + "context" "encoding/json" "fmt" "io" @@ -174,7 +175,7 @@ func testFlushTraceDatadog(t *testing.T, protobuf, jsn io.Reader) { assert.NoError(t, err) server.HandleTracePacket(packet) - server.Flush() + server.Flush(context.TODO()) // wait for remoteServer to process the POST select { @@ -208,5 +209,5 @@ func testFlushTraceLightstep(t *testing.T, protobuf, jsn io.Reader) { server.HandleTracePacket(packet) assert.NoError(t, err) - server.Flush() + server.Flush(context.TODO()) } diff --git a/metric_sink.go b/metric_sink.go index 810f575b6..1163dd951 100644 --- a/metric_sink.go +++ b/metric_sink.go @@ -14,6 +14,8 @@ import ( "github.com/stripe/veneur/trace" ) +const DatadogResourceKey = "resource" + type metricSink interface { Name() string Flush(context.Context, []samplers.InterMetric) error diff --git a/plugin_test.go b/plugin_test.go index 65614850b..ff3977483 100644 --- a/plugin_test.go +++ b/plugin_test.go @@ -82,7 +82,7 @@ func TestGlobalServerPluginFlush(t *testing.T) { }) } - f.server.Flush() + f.server.Flush(context.TODO()) } // TestLocalFilePluginRegister tests that we are able to register @@ -164,7 +164,7 @@ func TestGlobalServerS3PluginFlush(t *testing.T) { }) } - f.server.Flush() + f.server.Flush(context.TODO()) } func parseGzipTSV(r io.Reader) ([][]string, error) { diff --git a/server.go b/server.go index 2d2c783d9..97ed3a70b 100644 --- a/server.go +++ b/server.go @@ -3,6 +3,7 @@ package veneur import ( "bufio" "bytes" + "context" "crypto/tls" "crypto/x509" "crypto/x509/pkix" @@ -442,7 +443,7 @@ func (s *Server) Start() { ticker.Stop() return case <-ticker.C: - s.Flush() + s.Flush(context.TODO()) } } }() diff --git a/server_test.go b/server_test.go index 0cc7d61d6..cfb48d243 100644 --- a/server_test.go +++ b/server_test.go @@ -3,6 +3,7 @@ package veneur import ( "bytes" "compress/zlib" + "context" "crypto/tls" "crypto/x509" "encoding/json" @@ -252,7 +253,7 @@ func TestLocalServerUnaggregatedMetrics(t *testing.T) { }) } - f.server.Flush() + f.server.Flush(context.TODO()) ddmetrics := <-f.ddmetrics assert.Equal(t, 6, len(ddmetrics.Series), "incorrect number of elements in the flushed series on the remote server") @@ -280,7 +281,7 @@ func TestGlobalServerFlush(t *testing.T) { }) } - f.server.Flush() + f.server.Flush(context.TODO()) ddmetrics := <-f.ddmetrics assert.Equal(t, len(expectedMetrics), len(ddmetrics.Series), "incorrect number of elements in the flushed series on the remote server") @@ -390,7 +391,7 @@ func TestLocalServerMixedMetrics(t *testing.T) { }) } - f.server.Flush() + f.server.Flush(context.TODO()) // the global veneur instance should get valid data td := <-globalTD @@ -540,7 +541,7 @@ func sendTCPMetrics(addr string, tlsConfig *tls.Config, f *fixture) error { // check that the server received the stats; HACK: sleep to ensure workers process before flush time.Sleep(20 * time.Millisecond) - f.server.Flush() + f.server.Flush(context.TODO()) select { case ddmetrics := <-f.ddmetrics: if len(ddmetrics.Series) != 1 { From c688eb56cf8f96803b1e0b0be5f69c7544918b5b Mon Sep 17 00:00:00 2001 From: Cory Watson Date: Thu, 19 Oct 2017 14:12:31 -0700 Subject: [PATCH 2/4] Feedback fixes, comments and typos --- flusher.go | 5 ++--- flusher_test.go | 4 ++-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/flusher.go b/flusher.go index f80069a0a..bc4b15568 100644 --- a/flusher.go +++ b/flusher.go @@ -25,7 +25,6 @@ func (s *Server) Flush(ctx context.Context) { span := tracer.StartSpan("flush").(*trace.Span) defer span.Finish() - // TODO Move this to an independent ticker routine or something? mem := &runtime.MemStats{} runtime.ReadMemStats(mem) @@ -43,7 +42,7 @@ func (s *Server) Flush(ctx context.Context) { sink.FlushEventsChecks(span.Attach(ctx), events, checks) } - go s.flushTraces(span.Attach(ctx)) // this too! + go s.flushTraces(span.Attach(ctx)) // don't publish percentiles if we're a local veneur; that's the global // veneur's job @@ -77,7 +76,7 @@ func (s *Server) Flush(ctx context.Context) { go func(ms metricSink) { err := ms.Flush(span.Attach(ctx), finalMetrics) if err != nil { - log.WithError(err).WithField("sink", ms.Name()).Warn("Error flushin sink") + log.WithError(err).WithField("sink", ms.Name()).Warn("Error flushing sink") } wg.Done() }(sink) diff --git a/flusher_test.go b/flusher_test.go index 05462c057..da1fab50d 100644 --- a/flusher_test.go +++ b/flusher_test.go @@ -175,7 +175,7 @@ func testFlushTraceDatadog(t *testing.T, protobuf, jsn io.Reader) { assert.NoError(t, err) server.HandleTracePacket(packet) - server.Flush(context.TODO()) + server.Flush(context.Background()) // wait for remoteServer to process the POST select { @@ -209,5 +209,5 @@ func testFlushTraceLightstep(t *testing.T, protobuf, jsn io.Reader) { server.HandleTracePacket(packet) assert.NoError(t, err) - server.Flush(context.TODO()) + server.Flush(context.Background()) } From 7555ec246a4199cad2e994ed2701b4b96ab9c5c0 Mon Sep 17 00:00:00 2001 From: Cory Watson Date: Fri, 20 Oct 2017 13:27:35 -0700 Subject: [PATCH 3/4] Document sink's flush --- metric_sink.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/metric_sink.go b/metric_sink.go index 1163dd951..bab064e58 100644 --- a/metric_sink.go +++ b/metric_sink.go @@ -18,6 +18,9 @@ const DatadogResourceKey = "resource" type metricSink interface { Name() string + // Flush receives `InterMetric`s from Veneur and is responsible for "sinking" + // these metrics to whatever it's backend wants. Note that the sink must + // **not** mutate the incoming metrics as they are shared with other sinks. Flush(context.Context, []samplers.InterMetric) error // This one is temporary? FlushEventsChecks(ctx context.Context, events []samplers.UDPEvent, checks []samplers.UDPServiceCheck) From a137e5cea375a0f05d62b7c52e1825e1f06507bd Mon Sep 17 00:00:00 2001 From: Cory Watson Date: Mon, 23 Oct 2017 13:41:25 -0700 Subject: [PATCH 4/4] Update changelog and readme, fix TODO --- CHANGELOG.md | 5 ++++- README.md | 2 +- plugin_test.go | 4 ++-- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ef01bf952..1294dfc1e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,7 @@ -# 1.8, pending +# 1.8.0, pending + +## Improvements +* Veneur no longer **requires** the use of Datadog as a target for flushes. Veneur can now use one or more of any of it's supported sinks as a backend. This realizes our desire for Veneur to be fully vendor agnostic. Thanks [gphat](https://github.com/gphat)! ## Bugfixes * Fix a panic when using `veneur-emit` to emit metrics via `-ssf` when no tags are specified. Thanks [myndzi](https://github.com/myndzi) diff --git a/README.md b/README.md index 015ca599d..a6fb49a7d 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ [![Build Status](https://travis-ci.org/stripe/veneur.svg?branch=master)](https://travis-ci.org/stripe/veneur) [![GoDoc](https://godoc.org/github.com/stripe/veneur?status.svg)](http://godoc.org/github.com/stripe/veneur) -Veneur (venn-urr) is a distributed, fault-tolerant pipeline for runtime data. It provides a server implementation of the [DogStatsD protocol](http://docs.datadoghq.com/guides/dogstatsd/#datagram-format) for aggregating metrics and sending them to downstream storage, typically [Datadog](http://datadoghq.com). It can also act as a [global aggregator](#global-aggregation) for histograms, sets and counters. +Veneur (venn-urr) is a distributed, fault-tolerant pipeline for runtime data. It provides a server implementation of the [DogStatsD protocol](http://docs.datadoghq.com/guides/dogstatsd/#datagram-format) or [SSF](https://github.com/stripe/veneur/tree/master/ssf) for aggregating metrics and sending them to downstream storage to one or more supported sinks. It can also act as a [global aggregator](#global-aggregation) for histograms, sets and counters. More generically, Veneur is a convenient sink for various observability primitives. diff --git a/plugin_test.go b/plugin_test.go index ff3977483..adf36197b 100644 --- a/plugin_test.go +++ b/plugin_test.go @@ -82,7 +82,7 @@ func TestGlobalServerPluginFlush(t *testing.T) { }) } - f.server.Flush(context.TODO()) + f.server.Flush(context.Background()) } // TestLocalFilePluginRegister tests that we are able to register @@ -164,7 +164,7 @@ func TestGlobalServerS3PluginFlush(t *testing.T) { }) } - f.server.Flush(context.TODO()) + f.server.Flush(context.Background()) } func parseGzipTSV(r io.Reader) ([][]string, error) {