Skip to content

Commit

Permalink
fix app-extended-heartbeat payload way of being sent each 24 hours
Browse files Browse the repository at this point in the history
Signed-off-by: Eliott Bouhana <[email protected]>
  • Loading branch information
eliottness committed Jan 24, 2025
1 parent 8a6e71c commit ca1835b
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 94 deletions.
2 changes: 1 addition & 1 deletion internal/newtelemetry/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func newClient(tracerConfig internal.TracerConfig, config ClientConfig) (*client
tracerConfig: tracerConfig,
writer: writer,
clientConfig: config,
flushMapper: mapper.NewDefaultMapper(config.HeartbeatInterval),
flushMapper: mapper.NewDefaultMapper(config.HeartbeatInterval, config.ExtendedHeartbeatInterval),
// This means that, by default, we incur dataloss if we spend ~30mins without flushing, considering we send telemetry data this looks reasonable.
// This also means that in the worst case scenario, memory-wise, the app is stabilized after running for 30mins.
payloadQueue: internal.NewRingQueue[transport.Payload](4, 32),
Expand Down
10 changes: 10 additions & 0 deletions internal/newtelemetry/client_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ type ClientConfig struct {
// The maximum value is 60s.
HeartbeatInterval time.Duration

// ExtendedHeartbeatInterval is the interval at which to send an extended heartbeat payload, defaults to 24h.
ExtendedHeartbeatInterval time.Duration

// FlushIntervalRange is the interval at which the client flushes the data.
// By default, the client will start to Flush at 60s intervals and will reduce the interval based on the load till it hit 15s
// Both values cannot be higher than 60s because the heartbeat need to be sent at least every 60s.
Expand Down Expand Up @@ -75,6 +78,9 @@ const (
// defaultHeartbeatInterval is the default interval at which the agent sends a heartbeat.
defaultHeartbeatInterval = 60 // seconds

// defaultExtendedHeartbeatInterval is the default interval at which the agent sends an extended heartbeat.
defaultExtendedHeartbeatInterval = 24 * time.Hour

// defaultMinFlushInterval is the default interval at which the client flushes the data.
defaultMinFlushInterval = 15.0 * time.Second

Expand Down Expand Up @@ -158,6 +164,10 @@ func defaultConfig(config ClientConfig) ClientConfig {
config.EarlyFlushPayloadSize = defaultEarlyFlushPayloadSize
}

if config.ExtendedHeartbeatInterval == 0 {
config.ExtendedHeartbeatInterval = defaultExtendedHeartbeatInterval
}

return config
}

Expand Down
172 changes: 102 additions & 70 deletions internal/newtelemetry/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,34 +108,40 @@ func TestClientFlush(t *testing.T) {
{
name: "extended-heartbeat-config",
clientConfig: ClientConfig{
HeartbeatInterval: time.Nanosecond,
HeartbeatInterval: time.Nanosecond,
ExtendedHeartbeatInterval: time.Nanosecond,
},
when: func(c *client) {
c.AddAppConfig("key", "value", types.OriginDefault)
},
expect: func(t *testing.T, payload transport.Payload) {
require.IsType(t, transport.AppExtendedHeartbeat{}, payload)
heartbeat := payload.(transport.AppExtendedHeartbeat)
assert.Len(t, heartbeat.Configuration, 1)
assert.Equal(t, heartbeat.Configuration[0].Name, "key")
assert.Equal(t, heartbeat.Configuration[0].Value, "value")
assert.Equal(t, heartbeat.Configuration[0].Origin, types.OriginDefault)
require.IsType(t, transport.MessageBatch{}, payload)
batch := payload.(transport.MessageBatch)
require.Len(t, batch.Payload, 2)
assert.Equal(t, transport.RequestTypeAppClientConfigurationChange, batch.Payload[0].RequestType)
assert.Equal(t, transport.RequestTypeAppExtendedHeartBeat, batch.Payload[1].RequestType)

assert.Len(t, batch.Payload[1].Payload.(transport.AppExtendedHeartbeat).Configuration, 0)
},
},
{
name: "extended-heartbeat-integrations",
clientConfig: ClientConfig{
HeartbeatInterval: time.Nanosecond,
HeartbeatInterval: time.Nanosecond,
ExtendedHeartbeatInterval: time.Nanosecond,
},
when: func(c *client) {
c.MarkIntegrationAsLoaded(Integration{Name: "test-integration", Version: "1.0.0"})
},
expect: func(t *testing.T, payload transport.Payload) {
require.IsType(t, transport.AppExtendedHeartbeat{}, payload)
heartbeat := payload.(transport.AppExtendedHeartbeat)
assert.Len(t, heartbeat.Integrations, 1)
assert.Equal(t, heartbeat.Integrations[0].Name, "test-integration")
assert.Equal(t, heartbeat.Integrations[0].Version, "1.0.0")
require.IsType(t, transport.MessageBatch{}, payload)
batch := payload.(transport.MessageBatch)
require.Len(t, batch.Payload, 2)
assert.Equal(t, transport.RequestTypeAppIntegrationsChange, batch.Payload[0].RequestType)
assert.Equal(t, transport.RequestTypeAppExtendedHeartBeat, batch.Payload[1].RequestType)
assert.Len(t, batch.Payload[1].Payload.(transport.AppExtendedHeartbeat).Integrations, 1)
assert.Equal(t, batch.Payload[1].Payload.(transport.AppExtendedHeartbeat).Integrations[0].Name, "test-integration")
assert.Equal(t, batch.Payload[1].Payload.(transport.AppExtendedHeartbeat).Integrations[0].Version, "1.0.0")
},
},
{
Expand Down Expand Up @@ -241,6 +247,7 @@ func TestClientFlush(t *testing.T) {
expect: func(t *testing.T, payload transport.Payload) {
require.IsType(t, transport.MessageBatch{}, payload)
batch := payload.(transport.MessageBatch)
assert.Len(t, batch.Payload, 2)
for _, payload := range batch.Payload {
switch p := payload.Payload.(type) {
case transport.AppProductChange:
Expand All @@ -259,6 +266,39 @@ func TestClientFlush(t *testing.T) {
}
},
},
{
name: "product+integration+heartbeat",
clientConfig: ClientConfig{
HeartbeatInterval: time.Nanosecond,
},
when: func(c *client) {
c.ProductStarted("test-product")
c.MarkIntegrationAsLoaded(Integration{Name: "test-integration", Version: "1.0.0"})
},
expect: func(t *testing.T, payload transport.Payload) {
require.IsType(t, transport.MessageBatch{}, payload)
batch := payload.(transport.MessageBatch)
assert.Len(t, batch.Payload, 3)
for _, payload := range batch.Payload {
switch p := payload.Payload.(type) {
case transport.AppProductChange:
assert.Equal(t, transport.RequestTypeAppProductChange, payload.RequestType)
assert.Len(t, p.Products, 1)
assert.True(t, p.Products[types.Namespace("test-product")].Enabled)
case transport.AppIntegrationChange:
assert.Equal(t, transport.RequestTypeAppIntegrationsChange, payload.RequestType)
assert.Len(t, p.Integrations, 1)
assert.Equal(t, p.Integrations[0].Name, "test-integration")
assert.Equal(t, p.Integrations[0].Version, "1.0.0")
assert.True(t, p.Integrations[0].Enabled)
case transport.AppHeartbeat:
assert.Equal(t, transport.RequestTypeAppHeartbeat, payload.RequestType)
default:
t.Fatalf("unexpected payload type: %T", p)
}
}
},
},
{
name: "app-started",
when: func(c *client) {
Expand Down Expand Up @@ -305,24 +345,17 @@ func TestClientFlush(t *testing.T) {
c.MarkIntegrationAsLoaded(Integration{Name: "test-integration", Version: "1.0.0"})
},
expect: func(t *testing.T, payload transport.Payload) {
require.IsType(t, transport.MessageBatch{}, payload)
batch := payload.(transport.MessageBatch)

// Check AppStarted is the first payload in MessageBatch
assert.IsType(t, transport.AppStarted{}, batch.Payload[0].Payload)

for _, payload := range batch.Payload {
switch p := payload.Payload.(type) {
case transport.AppStarted:
assert.Equal(t, transport.RequestTypeAppStarted, payload.RequestType)
case transport.AppIntegrationChange:
assert.Equal(t, transport.RequestTypeAppIntegrationsChange, payload.RequestType)
assert.Len(t, p.Integrations, 1)
assert.Equal(t, p.Integrations[0].Name, "test-integration")
assert.Equal(t, p.Integrations[0].Version, "1.0.0")
default:
t.Fatalf("unexpected payload type: %T", p)
}
switch p := payload.(type) {
case transport.AppStarted:
assert.Equal(t, globalconfig.InstrumentationInstallID(), p.InstallSignature.InstallID)
assert.Equal(t, globalconfig.InstrumentationInstallType(), p.InstallSignature.InstallType)
assert.Equal(t, globalconfig.InstrumentationInstallTime(), p.InstallSignature.InstallTime)
case transport.AppIntegrationChange:
assert.Len(t, p.Integrations, 1)
assert.Equal(t, p.Integrations[0].Name, "test-integration")
assert.Equal(t, p.Integrations[0].Version, "1.0.0")
default:
t.Fatalf("unexpected payload type: %T", p)
}
},
},
Expand All @@ -335,21 +368,14 @@ func TestClientFlush(t *testing.T) {
c.appStart()
},
expect: func(t *testing.T, payload transport.Payload) {
require.IsType(t, transport.MessageBatch{}, payload)
batch := payload.(transport.MessageBatch)

// Check AppStarted is the first payload in MessageBatch
assert.IsType(t, transport.AppStarted{}, batch.Payload[0].Payload)

for _, payload := range batch.Payload {
switch p := payload.Payload.(type) {
case transport.AppStarted:
assert.Equal(t, transport.RequestTypeAppStarted, payload.RequestType)
case transport.AppHeartbeat:
assert.Equal(t, transport.RequestTypeAppHeartbeat, payload.RequestType)
default:
t.Fatalf("unexpected payload type: %T", p)
}
switch p := payload.(type) {
case transport.AppStarted:
assert.Equal(t, globalconfig.InstrumentationInstallID(), p.InstallSignature.InstallID)
assert.Equal(t, globalconfig.InstrumentationInstallType(), p.InstallSignature.InstallType)
assert.Equal(t, globalconfig.InstrumentationInstallTime(), p.InstallSignature.InstallTime)
case transport.AppHeartbeat:
default:
t.Fatalf("unexpected payload type: %T", p)
}
},
},
Expand Down Expand Up @@ -404,13 +430,6 @@ func TestClientEnd2End(t *testing.T) {
Env: "test-env",
Version: "1.0.0",
}
clientConfig := ClientConfig{
AgentURL: "http://localhost:8126",
HTTPClient: &http.Client{
Timeout: 5 * time.Second,
},
Debug: true,
}

parseRequest := func(t *testing.T, request *http.Request) transport.Body {
assert.Equal(t, "v2", request.Header.Get("DD-Telemetry-API-Version"))
Expand All @@ -428,6 +447,7 @@ func TestClientEnd2End(t *testing.T) {
var body transport.Body
require.NoError(t, json.NewDecoder(request.Body).Decode(&body))

assert.Equal(t, string(body.RequestType), request.Header.Get("DD-Telemetry-Request-Type"))
assert.Equal(t, "test-service", body.Application.ServiceName)
assert.Equal(t, "test-env", body.Application.Env)
assert.Equal(t, "1.0.0", body.Application.ServiceVersion)
Expand All @@ -445,7 +465,7 @@ func TestClientEnd2End(t *testing.T) {
assert.Equal(t, true, body.Debug)
assert.Equal(t, "v2", body.APIVersion)
assert.NotZero(t, body.TracerTime)
assert.EqualValues(t, 1, body.SeqID)
assert.LessOrEqual(t, int64(1), body.SeqID)
assert.Equal(t, globalconfig.RuntimeID(), body.RuntimeID)

return body
Expand All @@ -462,7 +482,7 @@ func TestClientEnd2End(t *testing.T) {
c.appStart()
},
expect: func(t *testing.T, request *http.Request) (*http.Response, error) {
assert.EqualValues(t, transport.RequestTypeAppStarted, request.Header.Get("DD-Telemetry-Request-Type"))
assert.Equal(t, string(transport.RequestTypeAppStarted), request.Header.Get("DD-Telemetry-Request-Type"))
body := parseRequest(t, request)
assert.Equal(t, transport.RequestTypeAppStarted, body.RequestType)
return &http.Response{
Expand All @@ -476,7 +496,7 @@ func TestClientEnd2End(t *testing.T) {
c.appStop()
},
expect: func(t *testing.T, request *http.Request) (*http.Response, error) {
assert.EqualValues(t, transport.RequestTypeAppClosing, request.Header.Get("DD-Telemetry-Request-Type"))
assert.Equal(t, string(transport.RequestTypeAppClosing), request.Header.Get("DD-Telemetry-Request-Type"))
body := parseRequest(t, request)
assert.Equal(t, transport.RequestTypeAppClosing, body.RequestType)
return &http.Response{
Expand All @@ -491,14 +511,19 @@ func TestClientEnd2End(t *testing.T) {
c.appStop()
},
expect: func(t *testing.T, request *http.Request) (*http.Response, error) {
assert.EqualValues(t, transport.RequestTypeMessageBatch, request.Header.Get("DD-Telemetry-Request-Type"))
body := parseRequest(t, request)
assert.Equal(t, transport.RequestTypeMessageBatch, body.RequestType)

payload := body.Payload.(transport.MessageBatch)
assert.Len(t, payload.Payload, 2)
assert.Equal(t, transport.RequestTypeAppStarted, payload.Payload[0].RequestType)
assert.Equal(t, transport.RequestTypeAppClosing, payload.Payload[1].RequestType)
switch request.Header.Get("DD-Telemetry-Request-Type") {
case string(transport.RequestTypeAppStarted):
payload := body.Payload.(*transport.AppStarted)
assert.Equal(t, globalconfig.InstrumentationInstallID(), payload.InstallSignature.InstallID)
assert.Equal(t, globalconfig.InstrumentationInstallType(), payload.InstallSignature.InstallType)
assert.Equal(t, globalconfig.InstrumentationInstallTime(), payload.InstallSignature.InstallTime)
case string(transport.RequestTypeAppClosing):

default:
t.Fatalf("unexpected request type: %s", request.Header.Get("DD-Telemetry-Request-Type"))
}

return &http.Response{
StatusCode: http.StatusOK,
Expand All @@ -508,15 +533,22 @@ func TestClientEnd2End(t *testing.T) {
} {
t.Run(test.name, func(t *testing.T) {
t.Parallel()
clientConfig.HTTPClient.Transport = &testRoundTripper{
roundTrip: func(req *http.Request) (*http.Response, error) {
if test.expect != nil {
return test.expect(t, req)
}
return &http.Response{
StatusCode: http.StatusOK,
}, nil
clientConfig := ClientConfig{
AgentURL: "http://localhost:8126",
HTTPClient: &http.Client{
Timeout: 5 * time.Second,
Transport: &testRoundTripper{
roundTrip: func(req *http.Request) (*http.Response, error) {
if test.expect != nil {
return test.expect(t, req)
}
return &http.Response{
StatusCode: http.StatusOK,
}, nil
},
},
},
Debug: true,
}

c, err := newClient(tracerConfig, clientConfig)
Expand Down
5 changes: 3 additions & 2 deletions internal/newtelemetry/internal/mapper/app_started.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func (t *appStartedReducer) Transform(payloads []transport.Payload) ([]transport
}
}

// The app-started event should be the first event in the payload
return t.wrapper.Transform(append([]transport.Payload{appStarted}, payloadLefts...))
// The app-started event should be the first event in the payload and not in an message-batch
payloads, mapper := t.wrapper.Transform(payloadLefts)
return append([]transport.Payload{appStarted}, payloads...), mapper
}
Loading

0 comments on commit ca1835b

Please sign in to comment.