diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index bdaf23d8..748ccb2d 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -185,35 +185,32 @@ func (svc *Service) Run(ctx context.Context, getClient func() (*gnmi.Client, err for { // process currently loaded agent from K8s - err = svc.processAgentFromKube(ctx, kube, agent, ¤tGen) + err = svc.processAgentFromKube(ctx, kube, agent, ¤tGen, false) if err != nil { return errors.Wrap(err, "failed to process agent config from k8s") } + if err := svc.hearbeat(ctx, kube, agent); err != nil { + return errors.Wrap(err, "failed to heartbeat after processing") + } select { case <-ctx.Done(): slog.Info("Context done, exiting") return nil - case <-time.After(15 * time.Second): - slog.Debug("Sending heartbeat", "name", agent.Name) - start := time.Now() - - if err := svc.processor.UpdateSwitchState(ctx, agent, svc.reg); err != nil { - return errors.Wrapf(err, "failed to update switch state") + case <-time.After(2 * time.Minute): + slog.Debug("Enforcing config", "name", agent.Name) + err = svc.processAgentFromKube(ctx, kube, agent, ¤tGen, false) + if err != nil { + return errors.Wrap(err, "failed to process agent config from k8s (enforce)") } - if st := svc.reg.GetSwitchState(); st != nil { - agent.Status.State = *st + if err := svc.hearbeat(ctx, kube, agent); err != nil { + return errors.Wrap(err, "failed to heartbeat after enforce") } - agent.Status.LastHeartbeat = metav1.Time{Time: time.Now()} - - err = kube.Status().Update(ctx, agent) - if err != nil { - return errors.Wrapf(err, "failed to update agent heartbeat") // TODO gracefully handle case if resourceVersion changed + case <-time.After(15 * time.Second): + if err := svc.hearbeat(ctx, kube, agent); err != nil { + return errors.Wrap(err, "failed to heartbeat") } - - svc.reg.AgentMetrics.HeartbeatDuration.Observe(time.Since(start).Seconds()) - svc.reg.AgentMetrics.HeartbeatsTotal.Inc() case event, ok := <-watcher.ResultChan(): // TODO check why channel gets closed if !ok { @@ -273,6 +270,35 @@ func (svc *Service) setInstallAndRunIDs() error { return nil } +func (svc *Service) hearbeat(ctx context.Context, kube client.Client, agent *agentapi.Agent) error { + if agent == nil { + slog.Debug("No agent to heartbeat") + + return nil + } + + slog.Debug("Sending heartbeat", "name", agent.Name) + start := time.Now() + + if err := svc.processor.UpdateSwitchState(ctx, agent, svc.reg); err != nil { + return errors.Wrapf(err, "failed to update switch state") + } + if st := svc.reg.GetSwitchState(); st != nil { + agent.Status.State = *st + } + agent.Status.LastHeartbeat = metav1.Time{Time: time.Now()} + + err := kube.Status().Update(ctx, agent) + if err != nil { + return errors.Wrapf(err, "failed to update agent heartbeat") // TODO gracefully handle case if resourceVersion changed + } + + svc.reg.AgentMetrics.HeartbeatDuration.Observe(time.Since(start).Seconds()) + svc.reg.AgentMetrics.HeartbeatsTotal.Inc() + + return nil +} + func (svc *Service) processAgent(ctx context.Context, agent *agentapi.Agent, readyCheck bool) error { start := time.Now() slog.Info("Processing agent config", "name", agent.Name, "gen", agent.Generation, "res", agent.ResourceVersion) @@ -386,7 +412,7 @@ func (svc *Service) processAgent(ctx context.Context, agent *agentapi.Agent, rea return errors.Wrapf(alloy.EnsureInstalled(ctx, agent, exporterPort), "failed to ensure alloy installed") } -func (svc *Service) processAgentFromKube(ctx context.Context, kube client.Client, agent *agentapi.Agent, currentGen *int64) error { +func (svc *Service) processAgentFromKube(ctx context.Context, kube client.Client, agent *agentapi.Agent, currentGen *int64, enforce bool) error { svc.reg.AgentMetrics.Generation.Set(float64(agent.Generation)) if agent.Generation == *currentGen {