From 640d6b04e262eac357e346be418a73fca67ca353 Mon Sep 17 00:00:00 2001 From: Sergei Lukianov Date: Mon, 25 Nov 2024 12:51:10 -0800 Subject: [PATCH] Enforce switch state every 2 mins (#673) Fixes #671 --- pkg/agent/agent.go | 104 ++++++++++++++++++++++++++++++--------------- 1 file changed, 69 insertions(+), 35 deletions(-) diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index bdaf23d8..d080cbbe 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -185,35 +185,32 @@ func (svc *Service) Run(ctx context.Context, getClient func() (*gnmi.Client, err for { // process currently loaded agent from K8s - err = svc.processAgentFromKube(ctx, kube, agent, ¤tGen) + err = svc.processAgentFromKube(ctx, kube, agent, ¤tGen, false) if err != nil { return errors.Wrap(err, "failed to process agent config from k8s") } + if err := svc.hearbeat(ctx, kube, agent); err != nil { + return errors.Wrap(err, "failed to heartbeat after processing") + } select { case <-ctx.Done(): slog.Info("Context done, exiting") return nil - case <-time.After(15 * time.Second): - slog.Debug("Sending heartbeat", "name", agent.Name) - start := time.Now() - - if err := svc.processor.UpdateSwitchState(ctx, agent, svc.reg); err != nil { - return errors.Wrapf(err, "failed to update switch state") + case <-time.After(2 * time.Minute): + slog.Debug("Enforcing config", "name", agent.Name) + err = svc.processAgentFromKube(ctx, kube, agent, ¤tGen, true) + if err != nil { + return errors.Wrap(err, "failed to process agent config from k8s (enforce)") } - if st := svc.reg.GetSwitchState(); st != nil { - agent.Status.State = *st + if err := svc.hearbeat(ctx, kube, agent); err != nil { + return errors.Wrap(err, "failed to heartbeat after enforce") } - agent.Status.LastHeartbeat = metav1.Time{Time: time.Now()} - - err = kube.Status().Update(ctx, agent) - if err != nil { - return errors.Wrapf(err, "failed to update agent heartbeat") // TODO gracefully handle case if resourceVersion changed + case <-time.After(15 * time.Second): + if err := svc.hearbeat(ctx, kube, agent); err != nil { + return errors.Wrap(err, "failed to heartbeat") } - - svc.reg.AgentMetrics.HeartbeatDuration.Observe(time.Since(start).Seconds()) - svc.reg.AgentMetrics.HeartbeatsTotal.Inc() case event, ok := <-watcher.ResultChan(): // TODO check why channel gets closed if !ok { @@ -273,6 +270,35 @@ func (svc *Service) setInstallAndRunIDs() error { return nil } +func (svc *Service) hearbeat(ctx context.Context, kube client.Client, agent *agentapi.Agent) error { + if agent == nil { + slog.Debug("No agent to heartbeat") + + return nil + } + + slog.Debug("Sending heartbeat", "name", agent.Name) + start := time.Now() + + if err := svc.processor.UpdateSwitchState(ctx, agent, svc.reg); err != nil { + return errors.Wrapf(err, "failed to update switch state") + } + if st := svc.reg.GetSwitchState(); st != nil { + agent.Status.State = *st + } + agent.Status.LastHeartbeat = metav1.Time{Time: time.Now()} + + err := kube.Status().Update(ctx, agent) + if err != nil { + return errors.Wrapf(err, "failed to update agent heartbeat") // TODO gracefully handle case if resourceVersion changed + } + + svc.reg.AgentMetrics.HeartbeatDuration.Observe(time.Since(start).Seconds()) + svc.reg.AgentMetrics.HeartbeatsTotal.Inc() + + return nil +} + func (svc *Service) processAgent(ctx context.Context, agent *agentapi.Agent, readyCheck bool) error { start := time.Now() slog.Info("Processing agent config", "name", agent.Name, "gen", agent.Generation, "res", agent.ResourceVersion) @@ -386,38 +412,46 @@ func (svc *Service) processAgent(ctx context.Context, agent *agentapi.Agent, rea return errors.Wrapf(alloy.EnsureInstalled(ctx, agent, exporterPort), "failed to ensure alloy installed") } -func (svc *Service) processAgentFromKube(ctx context.Context, kube client.Client, agent *agentapi.Agent, currentGen *int64) error { +func (svc *Service) processAgentFromKube(ctx context.Context, kube client.Client, agent *agentapi.Agent, currentGen *int64, enforce bool) error { svc.reg.AgentMetrics.Generation.Set(float64(agent.Generation)) - if agent.Generation == *currentGen { + if !enforce && agent.Generation == *currentGen { return nil } start := time.Now() - slog.Info("Agent config changed", "current", *currentGen, "new", agent.Generation) + + if agent.Generation != *currentGen { + slog.Info("Agent config changed", "current", *currentGen, "new", agent.Generation) + } else if enforce { + slog.Info("Enforcing agent config", "gen", agent.Generation) + } if agent.Status.Conditions == nil { agent.Status.Conditions = []metav1.Condition{} } - // TODO better handle status condtions - apimeta.SetStatusCondition(&agent.Status.Conditions, metav1.Condition{ - Type: "Applied", - Status: metav1.ConditionFalse, - Reason: "ApplyPending", - LastTransitionTime: metav1.Time{Time: time.Now()}, - Message: fmt.Sprintf("Config will be applied, gen=%d", agent.Generation), - }) - // demonstrating that we're going to try to apply config - agent.Status.LastAttemptGen = agent.Generation - agent.Status.LastAttemptTime = metav1.Time{Time: time.Now()} + if !enforce { + // TODO better handle status condtions + apimeta.SetStatusCondition(&agent.Status.Conditions, metav1.Condition{ + Type: "Applied", + Status: metav1.ConditionFalse, + Reason: "ApplyPending", + LastTransitionTime: metav1.Time{Time: time.Now()}, + Message: fmt.Sprintf("Config will be applied, gen=%d", agent.Generation), + }) - err := kube.Status().Update(ctx, agent) - if err != nil { - return errors.Wrapf(err, "error updating agent last attempt") // TODO gracefully handle case if resourceVersion changed + // demonstrating that we're going to try to apply config + agent.Status.LastAttemptGen = agent.Generation + agent.Status.LastAttemptTime = metav1.Time{Time: time.Now()} + + err := kube.Status().Update(ctx, agent) + if err != nil { + return errors.Wrapf(err, "error updating agent last attempt") // TODO gracefully handle case if resourceVersion changed + } } - err = svc.processActions(ctx, agent) + err := svc.processActions(ctx, agent) if err != nil { return errors.Wrap(err, "failed to process agent actions from k8s") }