Skip to content

Commit

Permalink
Merge branch 'main' into flavien/envoy/resourcename
Browse files Browse the repository at this point in the history
  • Loading branch information
hannahkm authored Jan 6, 2025
2 parents d446c41 + 5c4d2b5 commit 3e5a1fe
Show file tree
Hide file tree
Showing 14 changed files with 297 additions and 33 deletions.
28 changes: 21 additions & 7 deletions ddtrace/tracer/abandonedspans.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"sync/atomic"
"time"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/internal"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
)

Expand Down Expand Up @@ -77,27 +79,36 @@ type abandonedSpanCandidate struct {
TraceID, SpanID uint64
Start int64
Finished bool
Integration string
}

func newAbandonedSpanCandidate(s *span, finished bool) *abandonedSpanCandidate {
var component string
if v, ok := s.Meta[ext.Component]; ok {
component = v
} else {
component = "manual"
}
// finished is explicit instead of implicit as s.finished may be not set
// at the moment of calling this method.
// Also, locking is not required as it's called while the span is already locked or it's
// being initialized.
return &abandonedSpanCandidate{
Name: s.Name,
TraceID: s.TraceID,
SpanID: s.SpanID,
Start: s.Start,
Finished: finished,
c := &abandonedSpanCandidate{
Name: s.Name,
TraceID: s.TraceID,
SpanID: s.SpanID,
Start: s.Start,
Finished: finished,
Integration: component,
}
return c
}

// String takes a span and returns a human-readable string representing that span.
func (s *abandonedSpanCandidate) String() string {
age := now() - s.Start
a := fmt.Sprintf("%d sec", age/1e9)
return fmt.Sprintf("[name: %s, span_id: %d, trace_id: %d, age: %s],", s.Name, s.SpanID, s.TraceID, a)
return fmt.Sprintf("[name: %s, integration: %s, span_id: %d, trace_id: %d, age: %s],", s.Name, s.Integration, s.SpanID, s.TraceID, a)
}

type abandonedSpansDebugger struct {
Expand Down Expand Up @@ -292,6 +303,9 @@ func formatAbandonedSpans(b *bucket[uint64, *abandonedSpanCandidate], interval *
if interval != nil && curTime-s.Start < interval.Nanoseconds() {
continue
}
if t, ok := internal.GetGlobalTracer().(*tracer); ok {
t.statsd.Incr("datadog.tracer.abandoned_spans", []string{"name:" + s.Name, "integration:" + s.Integration}, 1)
}
spanCount++
msg := s.String()
sb.WriteString(msg)
Expand Down
55 changes: 54 additions & 1 deletion ddtrace/tracer/abandonedspans_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import (
"testing"
"time"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
"gopkg.in/DataDog/dd-trace-go.v1/internal/statsdtest"
"gopkg.in/DataDog/dd-trace-go.v1/internal/version"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -58,11 +60,62 @@ func assertProcessedSpans(assert *assert.Assertions, t *tracer, startedSpans, fi

func formatSpanString(s *span) string {
s.Lock()
msg := fmt.Sprintf("[name: %s, span_id: %d, trace_id: %d, age: %s],", s.Name, s.SpanID, s.TraceID, spanAge(s))
var integration string
if v, ok := s.Meta[ext.Component]; ok {
integration = v
} else {
integration = "manual"
}
msg := fmt.Sprintf("[name: %s, integration: %s, span_id: %d, trace_id: %d, age: %s],", s.Name, integration, s.SpanID, s.TraceID, spanAge(s))
s.Unlock()
return msg
}

func TestAbandonedSpansMetric(t *testing.T) {
assert := assert.New(t)
var tg statsdtest.TestStatsdClient
tp := new(log.RecordLogger)
tickerInterval = 100 * time.Millisecond
t.Run("finished", func(t *testing.T) {
tp.Reset()
tg.Reset()
defer setTestTime()()
tracer, _, _, stop := startTestTracer(t, WithLogger(tp), WithDebugSpansMode(500*time.Millisecond), withStatsdClient(&tg))
defer stop()
s := tracer.StartSpan("operation", StartTime(spanStart)).(*span)
s.Finish()
assertProcessedSpans(assert, tracer, 1, 1)
assert.Empty(tg.GetCallsByName("datadog.tracer.abandoned_spans"))
})
t.Run("open", func(t *testing.T) {
tp.Reset()
tg.Reset()
defer setTestTime()()
tracer, _, _, stop := startTestTracer(t, WithLogger(tp), WithDebugSpansMode(500*time.Millisecond), withStatsdClient(&tg))
defer stop()
tracer.StartSpan("operation", StartTime(spanStart), Tag(ext.Component, "some_integration_name"))
assertProcessedSpans(assert, tracer, 1, 0)
calls := tg.GetCallsByName("datadog.tracer.abandoned_spans")
assert.Len(calls, 1)
call := calls[0]
assert.Equal([]string{"name:operation", "integration:some_integration_name"}, call.Tags())
})
t.Run("both", func(t *testing.T) {
tp.Reset()
tg.Reset()
defer setTestTime()()
tracer, _, _, stop := startTestTracer(t, WithLogger(tp), WithDebugSpansMode(500*time.Millisecond), withStatsdClient(&tg))
defer stop()
sf := tracer.StartSpan("op", StartTime(spanStart)).(*span)
sf.Finish()
s := tracer.StartSpan("op2", StartTime(spanStart)).(*span)
assertProcessedSpans(assert, tracer, 2, 1)
calls := tg.GetCallsByName("datadog.tracer.abandoned_spans")
assert.Len(calls, 1)
s.Finish()
})
}

func TestReportAbandonedSpans(t *testing.T) {
assert := assert.New(t)
tp := new(log.RecordLogger)
Expand Down
4 changes: 3 additions & 1 deletion ddtrace/tracer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ func (t *tracer) reportRuntimeMetrics(interval time.Duration) {
}
}

func (t *tracer) reportHealthMetrics(interval time.Duration) {
// reportHealthMetricsAtInterval reports noisy health metrics at the specified interval.
// The periodic reporting ensures metrics are delivered without overwhelming the system or logs.
func (t *tracer) reportHealthMetricsAtInterval(interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
Expand Down
28 changes: 26 additions & 2 deletions ddtrace/tracer/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestReportRuntimeMetrics(t *testing.T) {
assert.Contains(calls, "runtime.go.gc_stats.pause_quantiles.75p")
}

func TestReportHealthMetrics(t *testing.T) {
func TestReportHealthMetricsAtInterval(t *testing.T) {
assert := assert.New(t)
var tg statsdtest.TestStatsdClient

Expand All @@ -55,12 +55,36 @@ func TestReportHealthMetrics(t *testing.T) {

tracer.StartSpan("operation").Finish()
flush(1)
tg.Wait(assert, 3, 10*time.Second)
tg.Wait(assert, 4, 10*time.Second)

counts := tg.Counts()
assert.Equal(int64(1), counts["datadog.tracer.spans_started"])
assert.Equal(int64(1), counts["datadog.tracer.spans_finished"])
assert.Equal(int64(0), counts["datadog.tracer.traces_dropped"])
assert.Equal(int64(1), counts["datadog.tracer.queue.enqueued.traces"])
}

func TestEnqueuedTracesHealthMetric(t *testing.T) {
assert := assert.New(t)
var tg statsdtest.TestStatsdClient

defer func(old time.Duration) { statsInterval = old }(statsInterval)
statsInterval = time.Nanosecond

tracer, _, flush, stop := startTestTracer(t, withStatsdClient(&tg))
defer stop()

for i := 0; i < 3; i++ {
tracer.StartSpan("operation").Finish()
}
flush(3)
tg.Wait(assert, 1, 10*time.Second)

counts := tg.Counts()
assert.Equal(int64(3), counts["datadog.tracer.queue.enqueued.traces"])
w, ok := tracer.traceWriter.(*agentTraceWriter)
assert.True(ok)
assert.Equal(uint32(0), w.tracesQueued)
}

func TestTracerMetrics(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion ddtrace/tracer/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ func newTracer(opts ...StartOption) *tracer {
t.wg.Add(1)
go func() {
defer t.wg.Done()
t.reportHealthMetrics(statsInterval)
t.reportHealthMetricsAtInterval(statsInterval)
}()
t.stats.Start()
return t
Expand Down
30 changes: 24 additions & 6 deletions ddtrace/tracer/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,16 +151,18 @@ func (t *httpTransport) send(p *payload) (body io.ReadCloser, err error) {
}
req.Header.Set(traceCountHeader, strconv.Itoa(p.itemCount()))
req.Header.Set(headerComputedTopLevel, "yes")
if t, ok := traceinternal.GetGlobalTracer().(*tracer); ok {
if t.config.tracingAsTransport || t.config.canComputeStats() {
var tr *tracer
var haveTracer bool
if tr, haveTracer = traceinternal.GetGlobalTracer().(*tracer); haveTracer {
if tr.config.tracingAsTransport || tr.config.canComputeStats() {
// tracingAsTransport uses this header to disable the trace agent's stats computation
// while making canComputeStats() always false to also disable client stats computation.
req.Header.Set("Datadog-Client-Computed-Stats", "yes")
}
droppedTraces := int(atomic.SwapUint32(&t.droppedP0Traces, 0))
partialTraces := int(atomic.SwapUint32(&t.partialTraces, 0))
droppedSpans := int(atomic.SwapUint32(&t.droppedP0Spans, 0))
if stats := t.statsd; stats != nil {
droppedTraces := int(atomic.SwapUint32(&tr.droppedP0Traces, 0))
partialTraces := int(atomic.SwapUint32(&tr.partialTraces, 0))
droppedSpans := int(atomic.SwapUint32(&tr.droppedP0Spans, 0))
if stats := tr.statsd; stats != nil {
stats.Count("datadog.tracer.dropped_p0_traces", int64(droppedTraces),
[]string{fmt.Sprintf("partial:%s", strconv.FormatBool(partialTraces > 0))}, 1)
stats.Count("datadog.tracer.dropped_p0_spans", int64(droppedSpans), nil, 1)
Expand All @@ -170,9 +172,11 @@ func (t *httpTransport) send(p *payload) (body io.ReadCloser, err error) {
}
response, err := t.client.Do(req)
if err != nil {
reportAPIErrorsMetric(haveTracer, response, err, tr)
return nil, err
}
if code := response.StatusCode; code >= 400 {
reportAPIErrorsMetric(haveTracer, response, err, tr)
// error, check the body for context information and
// return a nice error.
msg := make([]byte, 1000)
Expand All @@ -187,6 +191,20 @@ func (t *httpTransport) send(p *payload) (body io.ReadCloser, err error) {
return response.Body, nil
}

func reportAPIErrorsMetric(haveTracer bool, response *http.Response, err error, t *tracer) {
if !haveTracer {
return
}
var reason string
if err != nil {
reason = "network_failure"
}
if response != nil {
reason = fmt.Sprintf("server_response_%d", response.StatusCode)
}
t.statsd.Incr("datadog.tracer.api.errors", []string{"reason:" + reason}, 1)
}

func (t *httpTransport) endpoint() string {
return t.traceURL
}
85 changes: 85 additions & 0 deletions ddtrace/tracer/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
traceinternal "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/internal"
"gopkg.in/DataDog/dd-trace-go.v1/internal"
"gopkg.in/DataDog/dd-trace-go.v1/internal/statsdtest"
)

// getTestSpan returns a Span with different fields set
Expand Down Expand Up @@ -241,6 +243,89 @@ func TestCustomTransport(t *testing.T) {
assert.Equal(hits, 1)
}

type ErrTransport struct{}

func (t *ErrTransport) RoundTrip(req *http.Request) (*http.Response, error) {
return nil, fmt.Errorf("error in RoundTripper")
}

type ErrResponseTransport struct{}

func (t *ErrResponseTransport) RoundTrip(req *http.Request) (*http.Response, error) {
return &http.Response{StatusCode: 400}, nil
}

type OkTransport struct{}

func (t *OkTransport) RoundTrip(req *http.Request) (*http.Response, error) {
return &http.Response{StatusCode: 200}, nil
}

func TestApiErrorsMetric(t *testing.T) {
t.Run("error", func(t *testing.T) {
assert := assert.New(t)
c := &http.Client{
Transport: &ErrTransport{},
}
var tg statsdtest.TestStatsdClient
trc := newTracer(WithHTTPClient(c), withStatsdClient(&tg))
traceinternal.SetGlobalTracer(trc)
defer trc.Stop()

p, err := encode(getTestTrace(1, 1))
assert.NoError(err)

// We're expecting an error
_, err = trc.config.transport.send(p)
assert.Error(err)
calls := statsdtest.FilterCallsByName(tg.IncrCalls(), "datadog.tracer.api.errors")
assert.Len(calls, 1)
call := calls[0]
assert.Equal([]string{"reason:network_failure"}, call.Tags())

})
t.Run("response with err code", func(t *testing.T) {
assert := assert.New(t)
c := &http.Client{
Transport: &ErrResponseTransport{},
}
var tg statsdtest.TestStatsdClient
trc := newTracer(WithHTTPClient(c), withStatsdClient(&tg))
traceinternal.SetGlobalTracer(trc)
defer trc.Stop()

p, err := encode(getTestTrace(1, 1))
assert.NoError(err)

_, err = trc.config.transport.send(p)
assert.Error(err)

calls := statsdtest.FilterCallsByName(tg.IncrCalls(), "datadog.tracer.api.errors")
assert.Len(calls, 1)
call := calls[0]
assert.Equal([]string{"reason:server_response_400"}, call.Tags())
})
t.Run("successful send - no metric", func(t *testing.T) {
assert := assert.New(t)
var tg statsdtest.TestStatsdClient
c := &http.Client{
Transport: &OkTransport{},
}
trc := newTracer(WithHTTPClient(c), withStatsdClient(&tg))
traceinternal.SetGlobalTracer(trc)
defer trc.Stop()

p, err := encode(getTestTrace(1, 1))
assert.NoError(err)

_, err = trc.config.transport.send(p)
assert.NoError(err)

calls := statsdtest.FilterCallsByName(tg.IncrCalls(), "datadog.tracer.api.errors")
assert.Len(calls, 0)
})
}

func TestWithHTTPClient(t *testing.T) {
// disable instrumentation telemetry to prevent flaky number of requests
t.Setenv("DD_INSTRUMENTATION_TELEMETRY_ENABLED", "false")
Expand Down
5 changes: 5 additions & 0 deletions ddtrace/tracer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"os"
"strconv"
"sync"
"sync/atomic"
"time"

globalinternal "gopkg.in/DataDog/dd-trace-go.v1/internal"
Expand Down Expand Up @@ -50,6 +51,8 @@ type agentTraceWriter struct {

// statsd is used to send metrics
statsd globalinternal.StatsdClient

tracesQueued uint32
}

func newAgentTraceWriter(c *config, s *prioritySampler, statsdClient globalinternal.StatsdClient) *agentTraceWriter {
Expand All @@ -67,6 +70,7 @@ func (h *agentTraceWriter) add(trace []*span) {
h.statsd.Incr("datadog.tracer.traces_dropped", []string{"reason:encoding_error"}, 1)
log.Error("Error encoding msgpack: %v", err)
}
atomic.AddUint32(&h.tracesQueued, 1) // TODO: This does not differentiate between complete traces and partial chunks
if h.payload.size() > payloadSizeLimit {
h.statsd.Incr("datadog.tracer.flush_triggered", []string{"reason:size"}, 1)
h.flush()
Expand Down Expand Up @@ -94,6 +98,7 @@ func (h *agentTraceWriter) flush() {
// collection to avoid a memory leak when references to this object
// may still be kept by faulty transport implementations or the
// standard library. See dd-trace-go#976
h.statsd.Count("datadog.tracer.queue.enqueued.traces", int64(atomic.SwapUint32(&h.tracesQueued, 0)), nil, 1)
p.clear()

<-h.climit
Expand Down
Loading

0 comments on commit 3e5a1fe

Please sign in to comment.