From dbdc01ac0efb7479d6af0087bfe259a15b84bd4f Mon Sep 17 00:00:00 2001 From: Romain Marcadier Date: Mon, 9 Dec 2024 12:06:09 +0100 Subject: [PATCH] emit the metric on every heartbeat --- ddtrace/tracer/telemetry.go | 70 +++++++++++++++++++------------ internal/telemetry/client.go | 40 +++++++++++++++++- internal/telemetry/client_test.go | 60 ++++++++++++++++++++++++++ internal/telemetry/option.go | 11 +++++ 4 files changed, 152 insertions(+), 29 deletions(-) diff --git a/ddtrace/tracer/telemetry.go b/ddtrace/tracer/telemetry.go index dbaacdaa08..d871adf971 100644 --- a/ddtrace/tracer/telemetry.go +++ b/ddtrace/tracer/telemetry.go @@ -33,15 +33,7 @@ func startTelemetry(c *config) { // Do not do extra work populating config data if instrumentation telemetry is disabled. return } - telemetry.GlobalClient.ApplyOps( - telemetry.WithService(c.serviceName), - telemetry.WithEnv(c.env), - telemetry.WithHTTPClient(c.httpClient), - // c.logToStdout is true if serverless is turned on - // c.ciVisibilityAgentless is true if ci visibility mode is turned on and agentless writer is configured - telemetry.WithURL(c.logToStdout || c.ciVisibilityAgentless, c.agentURL.String()), - telemetry.WithVersion(c.version), - ) + telemetryConfigs := []telemetry.Configuration{ {Name: "trace_debug_enabled", Value: c.debug}, {Name: "agent_feature_drop_p0s", Value: c.agent.DropP0s}, @@ -73,6 +65,38 @@ func startTelemetry(c *config) { c.traceSampleRules.toTelemetry(), telemetry.Sanitize(telemetry.Configuration{Name: "span_sample_rules", Value: c.spanRules}), } + + // Process orchestrion enablement metric emission... + const orchestrionEnabledMetric = "orchestrion.enabled" + var ( + orchestrionEnabledValue float64 + orchestrionEnabledTags []string + ) + if c.orchestrionCfg.Enabled { + orchestrionEnabledValue = 1 + orchestrionEnabledTags = make([]string, 0, len(c.orchestrionCfg.Metadata)) + for k, v := range c.orchestrionCfg.Metadata { + telemetryConfigs = append(telemetryConfigs, telemetry.Configuration{Name: "orchestrion_" + k, Value: v}) + orchestrionEnabledTags = append(orchestrionEnabledTags, k+":"+v) + } + if testing.Testing() { + // In tests, ensure tags are consistently ordered... Ordering is irrelevant outside of tests. + slices.Sort(orchestrionEnabledTags) + } + } + + // Apply the GlobalClient options... + telemetry.GlobalClient.ApplyOps( + telemetry.WithService(c.serviceName), + telemetry.WithEnv(c.env), + telemetry.WithHTTPClient(c.httpClient), + // c.logToStdout is true if serverless is turned on + // c.ciVisibilityAgentless is true if ci visibility mode is turned on and agentless writer is configured + telemetry.WithURL(c.logToStdout || c.ciVisibilityAgentless, c.agentURL.String()), + telemetry.WithVersion(c.version), + telemetry.WithHeartbeatMetric(telemetry.NamespaceTracers, telemetry.MetricKindGauge, orchestrionEnabledMetric, func() float64 { return orchestrionEnabledValue }, orchestrionEnabledTags, false), + ) + var peerServiceMapping []string for key, value := range c.peerServiceMappings { peerServiceMapping = append(peerServiceMapping, fmt.Sprintf("%s:%s", key, value)) @@ -109,24 +133,16 @@ func startTelemetry(c *config) { telemetry.Configuration{Name: fmt.Sprintf("sr_%s_(%s)_(%s)", rule.ruleType.String(), service, name), Value: fmt.Sprintf("rate:%f_maxPerSecond:%f", rule.Rate, rule.MaxPerSecond)}) } - if c.orchestrionCfg.Enabled { - tags := make([]string, 0, len(c.orchestrionCfg.Metadata)) - for k, v := range c.orchestrionCfg.Metadata { - telemetryConfigs = append(telemetryConfigs, telemetry.Configuration{Name: "orchestrion_" + k, Value: v}) - tags = append(tags, k+":"+v) - } - if testing.Testing() { - // In tests, ensure tags are consistently ordered... - slices.Sort(tags) - } - telemetry.GlobalClient.Record( - telemetry.NamespaceTracers, - telemetry.MetricKindGauge, - "orchestrion.enabled", 1, - tags, - false, // Go-specific - ) - } + + // Submit the initial metric tick + telemetry.GlobalClient.Record( + telemetry.NamespaceTracers, + telemetry.MetricKindGauge, + orchestrionEnabledMetric, orchestrionEnabledValue, + orchestrionEnabledTags, + false, // Go-specific + ) + telemetryConfigs = append(telemetryConfigs, additionalConfigs...) telemetry.GlobalClient.ProductChange(telemetry.NamespaceTracers, true, telemetryConfigs) } diff --git a/internal/telemetry/client.go b/internal/telemetry/client.go index 7a94be85cd..9495620c06 100644 --- a/internal/telemetry/client.go +++ b/internal/telemetry/client.go @@ -147,6 +147,19 @@ type client struct { // Globally registered application configuration sent in the app-started request, along with the locally-defined // configuration of the event. globalAppConfig []Configuration + + // heartbeatMetrics is a set of metrics to be emitted each time a heartbeat is sent. + heartbeatMetrics []heartbeatMetric +} + +// heartbeatMetric is a metric that is emitted each time a heartbeat is sent. +type heartbeatMetric struct { + namespace Namespace + kind MetricKind + name string + value func() float64 // Called to determine the current value of the metric. + tags []string + common bool } func log(msg string, args ...interface{}) { @@ -338,14 +351,22 @@ func metricKey(name string, tags []string, kind MetricKind) string { return name + string(kind) + strings.Join(tags, "-") } -// Record sets the value for a gauge or distribution metric type -// with the given name and tags. If the metric is not language-specific, common should be set to true +// Record sets the value for a gauge or distribution metric type with the given +// name and tags. If the metric is not language-specific, common should be set +// to true func (c *client) Record(namespace Namespace, kind MetricKind, name string, value float64, tags []string, common bool) { c.mu.Lock() defer c.mu.Unlock() if !c.started { return } + c.record(namespace, kind, name, value, tags, common) +} + +// record sets the value for a gauge or distribution metric type with the given +// name and tags. If the metric is not language-soecific, common should be set +// to true. Must be called with c.mu locked. +func (c *client) record(namespace Namespace, kind MetricKind, name string, value float64, tags []string, common bool) { if _, ok := c.metrics[namespace]; !ok { c.metrics[namespace] = map[string]*metric{} } @@ -606,7 +627,22 @@ func (c *client) backgroundHeartbeat() { if !c.started { return } + + // Emit all the metrics that were registered for heartbeat. + c.emitHeartbeatMetrics() + + // Send the actual app heartbeat. c.scheduleSubmit(c.newRequest(RequestTypeAppHeartbeat)) c.flush(false) c.heartbeatT.Reset(c.heartbeatInterval) } + +// emitHeartbeatMetrics is invoked as part of each heartbeat tick, and is +// responsible for emitting periodic metrics that are expected to be sent +// throughout the lifetime of the service. These are typically gauge metrics. +// Must be called with c.mu locked. +func (c *client) emitHeartbeatMetrics() { + for _, m := range c.heartbeatMetrics { + c.record(m.namespace, m.kind, m.name, m.value(), m.tags, m.common) + } +} diff --git a/internal/telemetry/client_test.go b/internal/telemetry/client_test.go index 556157e38f..a75e2ec3a7 100644 --- a/internal/telemetry/client_test.go +++ b/internal/telemetry/client_test.go @@ -52,7 +52,67 @@ func TestClient(t *testing.T) { t.Fatal("Heartbeat took more than 30 seconds. Should have been ~1 second") case <-heartbeat: } +} +func TestHeartbeatMetric(t *testing.T) { + t.Setenv("DD_TELEMETRY_HEARTBEAT_INTERVAL", "1") + heartbeat := make(chan struct{}) + metrics := make(chan string) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + h := r.Header.Get("DD-Telemetry-Request-Type") + if len(h) == 0 { + t.Fatal("didn't get telemetry request type header") + } + switch RequestType(h) { + case RequestTypeAppHeartbeat: + select { + case heartbeat <- struct{}{}: + default: + } + case RequestTypeGenerateMetrics: + var data struct { + Payload *Metrics + } + if err := json.NewDecoder(r.Body).Decode(&data); err != nil { + t.Fatal(err) + } + for _, s := range data.Payload.Series { + select { + case metrics <- s.Metric: + default: + } + } + } + })) + defer server.Close() + + client := &client{ + URL: server.URL, + } + const metricName = "test.metric" + client.ApplyOps(WithHeartbeatMetric(NamespaceGeneral, MetricKindGauge, metricName, func() float64 { return 1 }, nil, false)) + + client.mu.Lock() + client.start(nil, NamespaceTracers, true) + client.start(nil, NamespaceTracers, true) // test idempotence + client.mu.Unlock() + defer client.Stop() + + timeout := time.After(30 * time.Second) + waitingForHeartbeat := true + waitingForMetric := true + for waitingForHeartbeat || waitingForMetric { + select { + case <-timeout: + t.Fatal("Heartbeat took more than 30 seconds. Should have been ~1 second") + case <-heartbeat: + waitingForHeartbeat = false + case m := <-metrics: + assert.Equal(t, metricName, m) + waitingForMetric = false + } + } } func TestMetrics(t *testing.T) { diff --git a/internal/telemetry/option.go b/internal/telemetry/option.go index 8320d1a5fb..2735958117 100644 --- a/internal/telemetry/option.go +++ b/internal/telemetry/option.go @@ -56,6 +56,17 @@ func WithVersion(version string) Option { } } +// WithHeartbeatMetric register a metric data point to be emitted at each +// heartbeat tick. This is useful to maintain gauge metrics at a specific level. +func WithHeartbeatMetric(namespace Namespace, kind MetricKind, name string, value func() float64, tags []string, common bool) Option { + return func(client *client) { + client.heartbeatMetrics = append( + client.heartbeatMetrics, + heartbeatMetric{namespace, kind, name, value, tags, common}, + ) + } +} + // WithHTTPClient specifies the http client for the telemetry client func WithHTTPClient(httpClient *http.Client) Option { return func(client *client) {