Skip to content

Commit

Permalink
WIP: changes
Browse files Browse the repository at this point in the history
  • Loading branch information
danielstokes committed Dec 24, 2024
1 parent 185d36f commit 529ae14
Show file tree
Hide file tree
Showing 12 changed files with 221 additions and 247 deletions.
6 changes: 3 additions & 3 deletions src/apm/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
const (
envHealthFleetControlFilepath = "NEW_RELIC_FLEET_CONTROL_HEALTH_PATH"
envHealthListenPort = "NEW_RELIC_SIDECAR_LISTEN_PORT"
healthSidecarContainerName = "newrelic-apm-health-sidecar"
HealthSidecarContainerName = "newrelic-apm-health-sidecar"
healthVolumeName = "newrelic-apm-health-volume"

HealthInstrumentedAnnotation = "newrelic.com/apm-health"
Expand Down Expand Up @@ -108,7 +108,7 @@ func (i *baseInjector) injectHealth(ctx context.Context, inst v1alpha2.Instrumen
}

// We just inject Volumes and init containers for the first processed container.
if isInitContainerMissing(pod, healthSidecarContainerName) {
if isInitContainerMissing(pod, HealthSidecarContainerName) {
if isPodVolumeMissing(pod, healthVolumeName) {
pod.Spec.Volumes = append(pod.Spec.Volumes, corev1.Volume{
Name: healthVolumeName,
Expand All @@ -119,7 +119,7 @@ func (i *baseInjector) injectHealth(ctx context.Context, inst v1alpha2.Instrumen

restartAlways := corev1.ContainerRestartPolicyAlways
sidecarContainer := corev1.Container{
Name: healthSidecarContainerName,
Name: HealthSidecarContainerName,
Image: inst.Spec.HealthAgent.Image,
RestartPolicy: &restartAlways,
VolumeMounts: []corev1.VolumeMount{{
Expand Down
34 changes: 17 additions & 17 deletions src/apm/health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestHealthInjector_Inject(t *testing.T) {
Spec: v1alpha2.InstrumentationSpec{
HealthAgent: v1alpha2.HealthAgent{
Image: "health",
Env: []corev1.EnvVar{{Name: "NEW_RELIC_FLEET_CONTROL_HEALTH_PATH", Value: "/health/this"}},
Env: []corev1.EnvVar{{Name: envHealthFleetControlFilepath, Value: "/health/this"}},
},
},
},
Expand All @@ -83,31 +83,31 @@ func TestHealthInjector_Inject(t *testing.T) {
},
Spec: corev1.PodSpec{
InitContainers: []corev1.Container{{
Name: "newrelic-apm-health-sidecar",
Name: HealthSidecarContainerName,
Image: "health",
VolumeMounts: []corev1.VolumeMount{{
Name: "newrelic-apm-health-volume",
Name: healthVolumeName,
MountPath: "/health/this",
}},
Env: []corev1.EnvVar{
{Name: "NEW_RELIC_FLEET_CONTROL_HEALTH_PATH", Value: "/health/this"},
{Name: "NEW_RELIC_SIDECAR_LISTEN_PORT", Value: "6194"},
{Name: envHealthFleetControlFilepath, Value: "/health/this"},
{Name: envHealthListenPort, Value: "6194"},
},
RestartPolicy: &restartAlways,
Ports: []corev1.ContainerPort{{ContainerPort: 6194}},
}},
Containers: []corev1.Container{{
Name: "test",
VolumeMounts: []corev1.VolumeMount{{
Name: "newrelic-apm-health-volume",
Name: healthVolumeName,
MountPath: "/health/this",
}},
Env: []corev1.EnvVar{
{Name: "NEW_RELIC_FLEET_CONTROL_HEALTH_PATH", Value: "/health/this"},
{Name: envHealthFleetControlFilepath, Value: "/health/this"},
},
}},
Volumes: []corev1.Volume{{
Name: "newrelic-apm-health-volume",
Name: healthVolumeName,
VolumeSource: corev1.VolumeSource{EmptyDir: &corev1.EmptyDirVolumeSource{}},
}},
}},
Expand All @@ -122,8 +122,8 @@ func TestHealthInjector_Inject(t *testing.T) {
HealthAgent: v1alpha2.HealthAgent{
Image: "health",
Env: []corev1.EnvVar{
{Name: "NEW_RELIC_FLEET_CONTROL_HEALTH_PATH", Value: "/health/this"},
{Name: "NEW_RELIC_SIDECAR_LISTEN_PORT", Value: "6194"},
{Name: envHealthFleetControlFilepath, Value: "/health/this"},
{Name: envHealthListenPort, Value: "6194"},
},
},
},
Expand All @@ -132,31 +132,31 @@ func TestHealthInjector_Inject(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{"newrelic.com/apm-health": "true"}},
Spec: corev1.PodSpec{
InitContainers: []corev1.Container{{
Name: "newrelic-apm-health-sidecar",
Name: HealthSidecarContainerName,
Image: "health",
VolumeMounts: []corev1.VolumeMount{{
Name: "newrelic-apm-health-volume",
Name: healthVolumeName,
MountPath: "/health/this",
}},
Env: []corev1.EnvVar{
{Name: "NEW_RELIC_FLEET_CONTROL_HEALTH_PATH", Value: "/health/this"},
{Name: "NEW_RELIC_SIDECAR_LISTEN_PORT", Value: "6194"},
{Name: envHealthFleetControlFilepath, Value: "/health/this"},
{Name: envHealthListenPort, Value: "6194"},
},
RestartPolicy: &restartAlways,
Ports: []corev1.ContainerPort{{ContainerPort: 6194}},
}},
Containers: []corev1.Container{{
Name: "test",
VolumeMounts: []corev1.VolumeMount{{
Name: "newrelic-apm-health-volume",
Name: healthVolumeName,
MountPath: "/health/this",
}},
Env: []corev1.EnvVar{
{Name: "NEW_RELIC_FLEET_CONTROL_HEALTH_PATH", Value: "/health/this"},
{Name: envHealthFleetControlFilepath, Value: "/health/this"},
},
}},
Volumes: []corev1.Volume{{
Name: "newrelic-apm-health-volume",
Name: healthVolumeName,
VolumeSource: corev1.VolumeSource{EmptyDir: &corev1.EmptyDirVolumeSource{}},
}},
}},
Expand Down
57 changes: 48 additions & 9 deletions src/instrumentation/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (

const (
instrumentationVersionAnnotation = "newrelic.com/instrumentation-versions"
healthSidecarContainerName = "newrelic-apm-health-sidecar"
healthSidecarContainerName = apm.HealthSidecarContainerName
healthUrlFormat = "http://%s:%d/healthz"
)

Expand All @@ -43,6 +43,27 @@ const (
triggerHealthCheck
)

func (ea eventAction) String() string {
switch ea {
case podSet:
return "podSet"
case podRemove:
return "podRemove"
case instSet:
return "instSet"
case instRemove:
return "instRemove"
case nsSet:
return "nsSet"
case nsRemove:
return "nsRemove"
case triggerHealthCheck:
return "triggerHealthCheck"
default:
return "unknown"
}
}

type event struct {
action eventAction
pod *corev1.Pod
Expand Down Expand Up @@ -240,26 +261,32 @@ func NewHealthMonitor(

func (m *HealthMonitor) resourceQueueEvent(ctx context.Context, ev event) {
logger := log.FromContext(ctx)
logger.Info("event", "action", ev.action)
logger.V(1).Info("event", "action", ev.action)
switch ev.action {
case nsSet:
logger.V(1).Info("event", "action", ev.action.String(), "entity", "namespace/"+ev.ns.Name)
m.namespaces[ev.ns.Name] = ev.ns
case nsRemove:
logger.V(1).Info("event", "action", ev.action.String(), "entity", "namespace/"+ev.ns.Name)
delete(m.namespaces, ev.ns.Name)
case podSet:
logger.V(1).Info("event", "action", ev.action.String(), "entity", "namespace/"+ev.pod.Namespace+"/pod/"+ev.pod.Name)
m.pods[ev.pod.Namespace+"/"+ev.pod.Name] = ev.pod
case podRemove:
logger.V(1).Info("event", "action", ev.action.String(), "entity", "namespace/"+ev.pod.Namespace+"/pod/"+ev.pod.Name)
delete(m.pods, ev.pod.Namespace+"/"+ev.pod.Name)
case instSet:
logger.V(1).Info("event", "action", ev.action.String(), "entity", "namespace/"+ev.inst.Namespace+"/instrumentation/"+ev.inst.Name)
m.instrumentations[ev.inst.Namespace+"/"+ev.inst.Name] = ev.inst
case instRemove:
logger.V(1).Info("event", "action", ev.action.String(), "entity", "namespace/"+ev.inst.Namespace+"/instrumentation/"+ev.inst.Name)
delete(m.instrumentations, ev.inst.Namespace+"/"+ev.inst.Name)
case triggerHealthCheck:
if atomic.LoadInt64(&m.healthCheckActive) == 1 {
return
}

logger.Info("health check start")
logger.Info("trigger health check")
podMetrics := m.getPodMetrics(ctx)
if len(podMetrics) == 0 {
logger.Info("nothing to check the health of. No pods")
Expand Down Expand Up @@ -300,10 +327,12 @@ func (m *HealthMonitor) healthCheckQueueEvent(ctx context.Context, event healthC

totalTime := time.Since(healthCheckStartTime)

logger.Info("health check time", "duration", totalTime.String())

// skip a tick (or more) if the time it takes exceeds our interval. round off the extra, otherwise we always skip at least 1
m.checksToSkip = int64(totalTime / m.tickInterval)
if m.checksToSkip > 0 {
logger.Info("Skipping health checks", "skip_count", m.checksToSkip, "interval", m.tickInterval.String())
logger.Info("skipping health checks", "skip_count", m.checksToSkip, "interval", m.tickInterval.String())
}
}

Expand Down Expand Up @@ -355,7 +384,17 @@ func (m *HealthMonitor) instrumentationMetricQueueEvent(ctx context.Context, eve

func (m *HealthMonitor) instrumentationMetricPersistQueueEvent(ctx context.Context, event *instrumentationMetric) {
defer event.resolve()
logger := log.FromContext(ctx)
logger := log.FromContext(ctx).WithValues(
"id", event.instrumentationID,
"pods", map[string]int64{
"matching": event.podsMatching,
"outdated": event.podsOutdated,
"unhealthy": event.podsUnhealthy,
"not_ready": event.podsNotReady,
"healthy": event.podsHealthy,
"injected": event.podsInjected,
},
)
if event.isDiff() {
event.syncStatus()
event.instrumentation.Status.LastUpdated = metav1.Now()
Expand All @@ -369,9 +408,9 @@ func (m *HealthMonitor) instrumentationMetricPersistQueueEvent(ctx context.Conte
if err := m.instrumentationStatusUpdater.UpdateInstrumentationStatus(ctx, event.instrumentation); err != nil {
logger.Error(err, "failed to update status for instrumentation")
}
logger.Info("wrote status for instrumentation", "id", event.instrumentationID)
logger.Info("wrote status for instrumentation")
} else {
logger.Info("no changes to status for instrumentation", "id", event.instrumentationID)
logger.Info("no changes to status for instrumentation")
}
}

Expand Down Expand Up @@ -512,7 +551,7 @@ func (m *HealthMonitor) check(ctx context.Context, podMetricItem *podMetric) Hea
if !m.isPodReady(podMetricItem.pod) {
return Health{}
}
logger.Info("checking health for pod", "pod", podMetricItem.podID)
logger.V(2).Info("checking health for pod", "pod", podMetricItem.podID)

podHealthUrl, err := m.getHealthUrlFromPod(podMetricItem.pod)
if err != nil {
Expand All @@ -531,7 +570,7 @@ func (m *HealthMonitor) check(ctx context.Context, podMetricItem *podMetric) Hea
LastError: fmt.Sprintf("failed while retrieving health > %s", err.Error()),
}
}
logger.Info("collected health for pod", "pod", podMetricItem.podID, "health", health)
logger.V(2).Info("collected health for pod", "pod", podMetricItem.podID, "health", health)
return health
}

Expand Down
39 changes: 18 additions & 21 deletions src/instrumentation/health_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,19 @@ package instrumentation
import (
"context"
"fmt"
"gopkg.in/yaml.v3"
"io"
"net/http"

"gopkg.in/yaml.v3"
)

type Health struct {
Healthy bool `json:"healthy"`
Status string `json:"status"`
StartTime string `json:"start_time_unix_nano"`
StatusTime string `json:"status_time_unix_nano"`
AgentRunID string `json:"agent_run_id"`
LastError string `json:"last_error"`
ComponentHealthMap map[string]*Health `json:"component_health_map"`
Healthy bool `json:"healthy" yaml:"healthy"`
Status string `json:"status" yaml:"status"`
StartTime int64 `json:"start_time_unix_nano" yaml:"start_time_unix_nano"`
StatusTime int64 `json:"status_time_unix_nano" yaml:"status_time_unix_nano"`
AgentRunID string `json:"agent_run_id" yaml:"agent_run_id"`
LastError string `json:"last_error" yaml:"last_error"`
ComponentHealthMap map[string]*Health `json:"component_health_map,omitempty" yaml:"component_health_map,omitempty"`
}

type HealthCheck interface {
Expand All @@ -37,29 +36,27 @@ func NewHealthCheckApi(httpClient *http.Client) *HealthCheckApi {
}

func (h *HealthCheckApi) GetHealth(ctx context.Context, url string) (health Health, err error) {
httpReq, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
var (
httpReq *http.Request
res *http.Response
body []byte
)
if httpReq, err = http.NewRequest(http.MethodGet, url, nil); err != nil {
return health, fmt.Errorf("failed to create request > %w", err)
}

httpClient := &http.Client{}
res, err := httpClient.Do(httpReq.WithContext(ctx))
if err != nil {
if res, err = h.httpClient.Do(httpReq.WithContext(ctx)); err != nil {
return health, fmt.Errorf("failed to send request > %w", err)
}
if res.Body != nil {
defer res.Body.Close()
}
if res.StatusCode != http.StatusOK {
return health, fmt.Errorf("failed to get expected response code of 200 > %w", err)
return health, fmt.Errorf("failed to get expected response code of 200, got %d > %w", res.StatusCode, err)
}
body, err := io.ReadAll(res.Body)
if err != nil {
if body, err = io.ReadAll(res.Body); err != nil {
return health, fmt.Errorf("failed to read response body > %w", err)
}

err = yaml.Unmarshal(body, &health)
if err != nil {
if err = yaml.Unmarshal(body, &health); err != nil {
return health, fmt.Errorf("failed to parse response > %w", err)
}
return health, nil
Expand Down
31 changes: 31 additions & 0 deletions src/instrumentation/health_check_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package instrumentation

import (
"context"
"net/http"
"net/http/httptest"
"testing"

"github.com/google/go-cmp/cmp"
)

func TestHealthCheckApi_GetHealth(t *testing.T) {
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte(`
healthy: true
status: "not ready"
status_time_unix_nano: 1734559668210947000
start_time_unix_nano: 1734559614000000000
agent_run_id: "1234-56"
last_error: "some error"
`))
}))
defer testServer.Close()
healthChecker := NewHealthCheckApi(nil)
health, _ := healthChecker.GetHealth(context.Background(), testServer.URL)
diff := cmp.Diff(health, Health{LastError: "some error", Status: "not ready", Healthy: true, StatusTime: 1734559668210947000, StartTime: 1734559614000000000, AgentRunID: "1234-56"})
if diff != "" {
t.Fatal(diff)
}
}
Loading

0 comments on commit 529ae14

Please sign in to comment.