Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Combine global/local logic into one unified flush and parallelize sinks. #292

Merged
merged 4 commits into from
Oct 25, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
# 1.8, pending
# 1.8.0, pending

## Improvements
* Veneur no longer **requires** the use of Datadog as a target for flushes. Veneur can now use one or more of any of it's supported sinks as a backend. This realizes our desire for Veneur to be fully vendor agnostic. Thanks [gphat](https://github.com/gphat)!

## Bugfixes
* Fix a panic when using `veneur-emit` to emit metrics via `-ssf` when no tags are specified. Thanks [myndzi](https://github.com/myndzi)
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
[![Build Status](https://travis-ci.org/stripe/veneur.svg?branch=master)](https://travis-ci.org/stripe/veneur)
[![GoDoc](https://godoc.org/github.com/stripe/veneur?status.svg)](http://godoc.org/github.com/stripe/veneur)

Veneur (venn-urr) is a distributed, fault-tolerant pipeline for runtime data. It provides a server implementation of the [DogStatsD protocol](http://docs.datadoghq.com/guides/dogstatsd/#datagram-format) for aggregating metrics and sending them to downstream storage, typically [Datadog](http://datadoghq.com). It can also act as a [global aggregator](#global-aggregation) for histograms, sets and counters.
Veneur (venn-urr) is a distributed, fault-tolerant pipeline for runtime data. It provides a server implementation of the [DogStatsD protocol](http://docs.datadoghq.com/guides/dogstatsd/#datagram-format) or [SSF](https://github.com/stripe/veneur/tree/master/ssf) for aggregating metrics and sending them to downstream storage to one or more supported sinks. It can also act as a [global aggregator](#global-aggregation) for histograms, sets and counters.

More generically, Veneur is a convenient sink for various observability primitives.

Expand Down
109 changes: 38 additions & 71 deletions flusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"net/http"
"net/url"
"runtime"
"sync"
"time"

"github.com/DataDog/datadog-go/statsd"
Expand All @@ -19,10 +20,8 @@ import (
"github.com/stripe/veneur/trace"
)

const DatadogResourceKey = "resource"

// Flush collects sampler's metrics and passes them to sinks.
func (s *Server) Flush() {
func (s *Server) Flush(ctx context.Context) {
span := tracer.StartSpan("flush").(*trace.Span)
defer span.Finish()

Expand All @@ -33,93 +32,57 @@ func (s *Server) Flush() {
s.Statsd.Gauge("gc.number", float64(mem.NumGC), nil, 1.0)
s.Statsd.Gauge("gc.pause_total_ns", float64(mem.PauseTotalNs), nil, 1.0)

// right now we have only one destination plugin
// but eventually, this is where we would loop over our supported
// destinations
if s.IsLocal() {
s.FlushLocal(span.Attach(context.Background()))
} else {
s.FlushGlobal(span.Attach(context.Background()))
}
}

// FlushGlobal sends any global metrics to their destination.
func (s *Server) FlushGlobal(ctx context.Context) {
span, _ := trace.StartSpanFromContext(ctx, "")
defer span.Finish()

// TODO Why is this not in the worker the way the trace worker is set up?
events, checks := s.EventWorker.Flush()
s.Statsd.Count("worker.events_flushed_total", int64(len(events)), nil, 1.0)
s.Statsd.Count("worker.checks_flushed_total", int64(len(checks)), nil, 1.0)

go s.metricSinks[0].FlushEventsChecks(span.Attach(ctx), events, checks) // we can do all of this separately
go s.flushTraces(span.Attach(ctx)) // this too!

percentiles := s.HistogramPercentiles

tempMetrics, ms := s.tallyMetrics(percentiles)

// the global veneur instance is also responsible for reporting the sets
// and global counters
ms.totalLength += ms.totalSets
ms.totalLength += ms.totalGlobalCounters

finalMetrics := s.generateInterMetrics(span.Attach(ctx), percentiles, tempMetrics, ms)

s.reportMetricsFlushCounts(ms)

s.reportGlobalMetricsFlushCounts(ms)

go func() {
for _, p := range s.getPlugins() {
start := time.Now()
err := p.Flush(span.Attach(ctx), finalMetrics)
s.Statsd.TimeInMilliseconds(fmt.Sprintf("flush.plugins.%s.total_duration_ns", p.Name()), float64(time.Since(start).Nanoseconds()), []string{"part:post"}, 1.0)
if err != nil {
countName := fmt.Sprintf("flush.plugins.%s.error_total", p.Name())
s.Statsd.Count(countName, 1, []string{}, 1.0)
}
s.Statsd.Gauge(fmt.Sprintf("flush.plugins.%s.post_metrics_total", p.Name()), float64(len(finalMetrics)), nil, 1.0)
}
}()

// TODO Don't hardcode this
s.metricSinks[0].Flush(span.Attach(ctx), finalMetrics)
}

// FlushLocal takes the slices of metrics, combines then and marshals them to json
// for posting to Datadog.
func (s *Server) FlushLocal(ctx context.Context) {
span, _ := trace.StartSpanFromContext(ctx, "")
defer span.Finish()

events, checks := s.EventWorker.Flush()
s.Statsd.Count("worker.checks_flushed_total", int64(len(checks)), nil, 1.0)
// TODO Concurrency
for _, sink := range s.metricSinks {
sink.FlushEventsChecks(span.Attach(ctx), events, checks)
}

go s.metricSinks[0].FlushEventsChecks(span.Attach(ctx), events, checks) // we can do all of this separately
go s.flushTraces(span.Attach(ctx))

// don't publish percentiles if we're a local veneur; that's the global
// veneur's job
var percentiles []float64
var finalMetrics []samplers.InterMetric

if !s.IsLocal() {
percentiles = s.HistogramPercentiles
}

tempMetrics, ms := s.tallyMetrics(percentiles)

finalMetrics := s.generateInterMetrics(span.Attach(ctx), percentiles, tempMetrics, ms)
finalMetrics = s.generateInterMetrics(span.Attach(ctx), percentiles, tempMetrics, ms)

s.reportMetricsFlushCounts(ms)

// we don't report totalHistograms, totalSets, or totalTimers for local veneur instances

// we cannot do this until we're done using tempMetrics within this function,
// since not everything in tempMetrics is safe for sharing
go s.flushForward(span.Attach(ctx), tempMetrics)
if s.IsLocal() {
go s.flushForward(span.Attach(ctx), tempMetrics)
} else {
s.reportGlobalMetricsFlushCounts(ms)
}

// If there's nothing to flush, don't bother calling the plugins and stuff.
if len(finalMetrics) == 0 {
return
}

wg := sync.WaitGroup{}
for _, sink := range s.metricSinks {
wg.Add(1)
go func(ms metricSink) {
err := ms.Flush(span.Attach(ctx), finalMetrics)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that this is happening asynchronously, we'll want to make doubly-sure that our interface definition specifies that the Flush method is not allowed to mutate the slice it receives.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely! How do we do that? :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd add it to the interface definition in metric_sink.go.

if err != nil {
log.WithError(err).WithField("sink", ms.Name()).Warn("Error flushing sink")
}
wg.Done()
}(sink)
}
wg.Wait()

go func() {
for _, p := range s.getPlugins() {
start := time.Now()
Expand All @@ -132,9 +95,6 @@ func (s *Server) FlushLocal(ctx context.Context) {
s.Statsd.Gauge(fmt.Sprintf("flush.plugins.%s.post_metrics_total", p.Name()), float64(len(finalMetrics)), nil, 1.0)
}
}()

// TODO Don't harcode this
s.metricSinks[0].Flush(span.Attach(ctx), finalMetrics)
}

type metricsSummary struct {
Expand Down Expand Up @@ -195,6 +155,13 @@ func (s *Server) tallyMetrics(percentiles []float64) ([]WorkerMetrics, metricsSu
// 'local-only' histograms.
ms.totalLocalSets + (ms.totalLocalTimers+ms.totalLocalHistograms)*(s.HistogramAggregates.Count+len(s.HistogramPercentiles))

// Global instances also flush sets and global counters, so be sure and add
// them to the total size
if !s.IsLocal() {
ms.totalLength += ms.totalSets
ms.totalLength += ms.totalGlobalCounters
}

return tempMetrics, ms
}

Expand Down
5 changes: 3 additions & 2 deletions flusher_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package veneur

import (
"context"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -174,7 +175,7 @@ func testFlushTraceDatadog(t *testing.T, protobuf, jsn io.Reader) {
assert.NoError(t, err)

server.HandleTracePacket(packet)
server.Flush()
server.Flush(context.Background())

// wait for remoteServer to process the POST
select {
Expand Down Expand Up @@ -208,5 +209,5 @@ func testFlushTraceLightstep(t *testing.T, protobuf, jsn io.Reader) {
server.HandleTracePacket(packet)

assert.NoError(t, err)
server.Flush()
server.Flush(context.Background())
}
5 changes: 5 additions & 0 deletions metric_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,13 @@ import (
"github.com/stripe/veneur/trace"
)

const DatadogResourceKey = "resource"

type metricSink interface {
Name() string
// Flush receives `InterMetric`s from Veneur and is responsible for "sinking"
// these metrics to whatever it's backend wants. Note that the sink must
// **not** mutate the incoming metrics as they are shared with other sinks.
Flush(context.Context, []samplers.InterMetric) error
// This one is temporary?
FlushEventsChecks(ctx context.Context, events []samplers.UDPEvent, checks []samplers.UDPServiceCheck)
Expand Down
4 changes: 2 additions & 2 deletions plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestGlobalServerPluginFlush(t *testing.T) {
})
}

f.server.Flush()
f.server.Flush(context.Background())
}

// TestLocalFilePluginRegister tests that we are able to register
Expand Down Expand Up @@ -164,7 +164,7 @@ func TestGlobalServerS3PluginFlush(t *testing.T) {
})
}

f.server.Flush()
f.server.Flush(context.Background())
}

func parseGzipTSV(r io.Reader) ([][]string, error) {
Expand Down
3 changes: 2 additions & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package veneur
import (
"bufio"
"bytes"
"context"
"crypto/tls"
"crypto/x509"
"crypto/x509/pkix"
Expand Down Expand Up @@ -442,7 +443,7 @@ func (s *Server) Start() {
ticker.Stop()
return
case <-ticker.C:
s.Flush()
s.Flush(context.TODO())
}
}
}()
Expand Down
9 changes: 5 additions & 4 deletions server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package veneur
import (
"bytes"
"compress/zlib"
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
Expand Down Expand Up @@ -252,7 +253,7 @@ func TestLocalServerUnaggregatedMetrics(t *testing.T) {
})
}

f.server.Flush()
f.server.Flush(context.TODO())

ddmetrics := <-f.ddmetrics
assert.Equal(t, 6, len(ddmetrics.Series), "incorrect number of elements in the flushed series on the remote server")
Expand Down Expand Up @@ -280,7 +281,7 @@ func TestGlobalServerFlush(t *testing.T) {
})
}

f.server.Flush()
f.server.Flush(context.TODO())

ddmetrics := <-f.ddmetrics
assert.Equal(t, len(expectedMetrics), len(ddmetrics.Series), "incorrect number of elements in the flushed series on the remote server")
Expand Down Expand Up @@ -390,7 +391,7 @@ func TestLocalServerMixedMetrics(t *testing.T) {
})
}

f.server.Flush()
f.server.Flush(context.TODO())

// the global veneur instance should get valid data
td := <-globalTD
Expand Down Expand Up @@ -540,7 +541,7 @@ func sendTCPMetrics(addr string, tlsConfig *tls.Config, f *fixture) error {

// check that the server received the stats; HACK: sleep to ensure workers process before flush
time.Sleep(20 * time.Millisecond)
f.server.Flush()
f.server.Flush(context.TODO())
select {
case ddmetrics := <-f.ddmetrics:
if len(ddmetrics.Series) != 1 {
Expand Down