Skip to content

Commit

Permalink
emit the metric on every heartbeat
Browse files Browse the repository at this point in the history
  • Loading branch information
RomainMuller committed Dec 9, 2024
1 parent 64f7fe1 commit dbdc01a
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 29 deletions.
70 changes: 43 additions & 27 deletions ddtrace/tracer/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,7 @@ func startTelemetry(c *config) {
// Do not do extra work populating config data if instrumentation telemetry is disabled.
return
}
telemetry.GlobalClient.ApplyOps(
telemetry.WithService(c.serviceName),
telemetry.WithEnv(c.env),
telemetry.WithHTTPClient(c.httpClient),
// c.logToStdout is true if serverless is turned on
// c.ciVisibilityAgentless is true if ci visibility mode is turned on and agentless writer is configured
telemetry.WithURL(c.logToStdout || c.ciVisibilityAgentless, c.agentURL.String()),
telemetry.WithVersion(c.version),
)

telemetryConfigs := []telemetry.Configuration{
{Name: "trace_debug_enabled", Value: c.debug},
{Name: "agent_feature_drop_p0s", Value: c.agent.DropP0s},
Expand Down Expand Up @@ -73,6 +65,38 @@ func startTelemetry(c *config) {
c.traceSampleRules.toTelemetry(),
telemetry.Sanitize(telemetry.Configuration{Name: "span_sample_rules", Value: c.spanRules}),
}

// Process orchestrion enablement metric emission...
const orchestrionEnabledMetric = "orchestrion.enabled"
var (
orchestrionEnabledValue float64
orchestrionEnabledTags []string
)
if c.orchestrionCfg.Enabled {
orchestrionEnabledValue = 1
orchestrionEnabledTags = make([]string, 0, len(c.orchestrionCfg.Metadata))
for k, v := range c.orchestrionCfg.Metadata {
telemetryConfigs = append(telemetryConfigs, telemetry.Configuration{Name: "orchestrion_" + k, Value: v})
orchestrionEnabledTags = append(orchestrionEnabledTags, k+":"+v)
}
if testing.Testing() {
// In tests, ensure tags are consistently ordered... Ordering is irrelevant outside of tests.
slices.Sort(orchestrionEnabledTags)
}
}

// Apply the GlobalClient options...
telemetry.GlobalClient.ApplyOps(
telemetry.WithService(c.serviceName),
telemetry.WithEnv(c.env),
telemetry.WithHTTPClient(c.httpClient),
// c.logToStdout is true if serverless is turned on
// c.ciVisibilityAgentless is true if ci visibility mode is turned on and agentless writer is configured
telemetry.WithURL(c.logToStdout || c.ciVisibilityAgentless, c.agentURL.String()),
telemetry.WithVersion(c.version),
telemetry.WithHeartbeatMetric(telemetry.NamespaceTracers, telemetry.MetricKindGauge, orchestrionEnabledMetric, func() float64 { return orchestrionEnabledValue }, orchestrionEnabledTags, false),
)

var peerServiceMapping []string
for key, value := range c.peerServiceMappings {
peerServiceMapping = append(peerServiceMapping, fmt.Sprintf("%s:%s", key, value))
Expand Down Expand Up @@ -109,24 +133,16 @@ func startTelemetry(c *config) {
telemetry.Configuration{Name: fmt.Sprintf("sr_%s_(%s)_(%s)", rule.ruleType.String(), service, name),
Value: fmt.Sprintf("rate:%f_maxPerSecond:%f", rule.Rate, rule.MaxPerSecond)})
}
if c.orchestrionCfg.Enabled {
tags := make([]string, 0, len(c.orchestrionCfg.Metadata))
for k, v := range c.orchestrionCfg.Metadata {
telemetryConfigs = append(telemetryConfigs, telemetry.Configuration{Name: "orchestrion_" + k, Value: v})
tags = append(tags, k+":"+v)
}
if testing.Testing() {
// In tests, ensure tags are consistently ordered...
slices.Sort(tags)
}
telemetry.GlobalClient.Record(
telemetry.NamespaceTracers,
telemetry.MetricKindGauge,
"orchestrion.enabled", 1,
tags,
false, // Go-specific
)
}

// Submit the initial metric tick
telemetry.GlobalClient.Record(
telemetry.NamespaceTracers,
telemetry.MetricKindGauge,
orchestrionEnabledMetric, orchestrionEnabledValue,
orchestrionEnabledTags,
false, // Go-specific
)

telemetryConfigs = append(telemetryConfigs, additionalConfigs...)
telemetry.GlobalClient.ProductChange(telemetry.NamespaceTracers, true, telemetryConfigs)
}
40 changes: 38 additions & 2 deletions internal/telemetry/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,19 @@ type client struct {
// Globally registered application configuration sent in the app-started request, along with the locally-defined
// configuration of the event.
globalAppConfig []Configuration

// heartbeatMetrics is a set of metrics to be emitted each time a heartbeat is sent.
heartbeatMetrics []heartbeatMetric
}

// heartbeatMetric is a metric that is emitted each time a heartbeat is sent.
type heartbeatMetric struct {
namespace Namespace
kind MetricKind
name string
value func() float64 // Called to determine the current value of the metric.
tags []string
common bool
}

func log(msg string, args ...interface{}) {
Expand Down Expand Up @@ -338,14 +351,22 @@ func metricKey(name string, tags []string, kind MetricKind) string {
return name + string(kind) + strings.Join(tags, "-")
}

// Record sets the value for a gauge or distribution metric type
// with the given name and tags. If the metric is not language-specific, common should be set to true
// Record sets the value for a gauge or distribution metric type with the given
// name and tags. If the metric is not language-specific, common should be set
// to true
func (c *client) Record(namespace Namespace, kind MetricKind, name string, value float64, tags []string, common bool) {
c.mu.Lock()
defer c.mu.Unlock()
if !c.started {
return
}
c.record(namespace, kind, name, value, tags, common)
}

// record sets the value for a gauge or distribution metric type with the given
// name and tags. If the metric is not language-soecific, common should be set
// to true. Must be called with c.mu locked.
func (c *client) record(namespace Namespace, kind MetricKind, name string, value float64, tags []string, common bool) {
if _, ok := c.metrics[namespace]; !ok {
c.metrics[namespace] = map[string]*metric{}
}
Expand Down Expand Up @@ -606,7 +627,22 @@ func (c *client) backgroundHeartbeat() {
if !c.started {
return
}

// Emit all the metrics that were registered for heartbeat.
c.emitHeartbeatMetrics()

// Send the actual app heartbeat.
c.scheduleSubmit(c.newRequest(RequestTypeAppHeartbeat))
c.flush(false)
c.heartbeatT.Reset(c.heartbeatInterval)
}

// emitHeartbeatMetrics is invoked as part of each heartbeat tick, and is
// responsible for emitting periodic metrics that are expected to be sent
// throughout the lifetime of the service. These are typically gauge metrics.
// Must be called with c.mu locked.
func (c *client) emitHeartbeatMetrics() {
for _, m := range c.heartbeatMetrics {
c.record(m.namespace, m.kind, m.name, m.value(), m.tags, m.common)
}
}
60 changes: 60 additions & 0 deletions internal/telemetry/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,67 @@ func TestClient(t *testing.T) {
t.Fatal("Heartbeat took more than 30 seconds. Should have been ~1 second")
case <-heartbeat:
}
}

func TestHeartbeatMetric(t *testing.T) {
t.Setenv("DD_TELEMETRY_HEARTBEAT_INTERVAL", "1")
heartbeat := make(chan struct{})
metrics := make(chan string)

server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
h := r.Header.Get("DD-Telemetry-Request-Type")
if len(h) == 0 {
t.Fatal("didn't get telemetry request type header")
}
switch RequestType(h) {
case RequestTypeAppHeartbeat:
select {
case heartbeat <- struct{}{}:
default:
}
case RequestTypeGenerateMetrics:
var data struct {
Payload *Metrics
}
if err := json.NewDecoder(r.Body).Decode(&data); err != nil {
t.Fatal(err)
}
for _, s := range data.Payload.Series {
select {
case metrics <- s.Metric:
default:
}
}
}
}))
defer server.Close()

client := &client{
URL: server.URL,
}
const metricName = "test.metric"
client.ApplyOps(WithHeartbeatMetric(NamespaceGeneral, MetricKindGauge, metricName, func() float64 { return 1 }, nil, false))

client.mu.Lock()
client.start(nil, NamespaceTracers, true)
client.start(nil, NamespaceTracers, true) // test idempotence
client.mu.Unlock()
defer client.Stop()

timeout := time.After(30 * time.Second)
waitingForHeartbeat := true
waitingForMetric := true
for waitingForHeartbeat || waitingForMetric {
select {
case <-timeout:
t.Fatal("Heartbeat took more than 30 seconds. Should have been ~1 second")
case <-heartbeat:
waitingForHeartbeat = false
case m := <-metrics:
assert.Equal(t, metricName, m)
waitingForMetric = false
}
}
}

func TestMetrics(t *testing.T) {
Expand Down
11 changes: 11 additions & 0 deletions internal/telemetry/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,17 @@ func WithVersion(version string) Option {
}
}

// WithHeartbeatMetric register a metric data point to be emitted at each
// heartbeat tick. This is useful to maintain gauge metrics at a specific level.
func WithHeartbeatMetric(namespace Namespace, kind MetricKind, name string, value func() float64, tags []string, common bool) Option {
return func(client *client) {
client.heartbeatMetrics = append(
client.heartbeatMetrics,
heartbeatMetric{namespace, kind, name, value, tags, common},
)
}
}

// WithHTTPClient specifies the http client for the telemetry client
func WithHTTPClient(httpClient *http.Client) Option {
return func(client *client) {
Expand Down

0 comments on commit dbdc01a

Please sign in to comment.