Skip to content

Commit

Permalink
Enforce switch state every 2 mins (#673)
Browse files Browse the repository at this point in the history
Fixes #671
  • Loading branch information
Frostman authored Nov 25, 2024
1 parent 751f8f3 commit 640d6b0
Showing 1 changed file with 69 additions and 35 deletions.
104 changes: 69 additions & 35 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,35 +185,32 @@ func (svc *Service) Run(ctx context.Context, getClient func() (*gnmi.Client, err

for {
// process currently loaded agent from K8s
err = svc.processAgentFromKube(ctx, kube, agent, &currentGen)
err = svc.processAgentFromKube(ctx, kube, agent, &currentGen, false)
if err != nil {
return errors.Wrap(err, "failed to process agent config from k8s")
}
if err := svc.hearbeat(ctx, kube, agent); err != nil {
return errors.Wrap(err, "failed to heartbeat after processing")
}

select {
case <-ctx.Done():
slog.Info("Context done, exiting")

return nil
case <-time.After(15 * time.Second):
slog.Debug("Sending heartbeat", "name", agent.Name)
start := time.Now()

if err := svc.processor.UpdateSwitchState(ctx, agent, svc.reg); err != nil {
return errors.Wrapf(err, "failed to update switch state")
case <-time.After(2 * time.Minute):
slog.Debug("Enforcing config", "name", agent.Name)
err = svc.processAgentFromKube(ctx, kube, agent, &currentGen, true)
if err != nil {
return errors.Wrap(err, "failed to process agent config from k8s (enforce)")
}
if st := svc.reg.GetSwitchState(); st != nil {
agent.Status.State = *st
if err := svc.hearbeat(ctx, kube, agent); err != nil {
return errors.Wrap(err, "failed to heartbeat after enforce")
}
agent.Status.LastHeartbeat = metav1.Time{Time: time.Now()}

err = kube.Status().Update(ctx, agent)
if err != nil {
return errors.Wrapf(err, "failed to update agent heartbeat") // TODO gracefully handle case if resourceVersion changed
case <-time.After(15 * time.Second):
if err := svc.hearbeat(ctx, kube, agent); err != nil {
return errors.Wrap(err, "failed to heartbeat")
}

svc.reg.AgentMetrics.HeartbeatDuration.Observe(time.Since(start).Seconds())
svc.reg.AgentMetrics.HeartbeatsTotal.Inc()
case event, ok := <-watcher.ResultChan():
// TODO check why channel gets closed
if !ok {
Expand Down Expand Up @@ -273,6 +270,35 @@ func (svc *Service) setInstallAndRunIDs() error {
return nil
}

func (svc *Service) hearbeat(ctx context.Context, kube client.Client, agent *agentapi.Agent) error {
if agent == nil {
slog.Debug("No agent to heartbeat")

return nil
}

slog.Debug("Sending heartbeat", "name", agent.Name)
start := time.Now()

if err := svc.processor.UpdateSwitchState(ctx, agent, svc.reg); err != nil {
return errors.Wrapf(err, "failed to update switch state")
}
if st := svc.reg.GetSwitchState(); st != nil {
agent.Status.State = *st
}
agent.Status.LastHeartbeat = metav1.Time{Time: time.Now()}

err := kube.Status().Update(ctx, agent)
if err != nil {
return errors.Wrapf(err, "failed to update agent heartbeat") // TODO gracefully handle case if resourceVersion changed
}

svc.reg.AgentMetrics.HeartbeatDuration.Observe(time.Since(start).Seconds())
svc.reg.AgentMetrics.HeartbeatsTotal.Inc()

return nil
}

func (svc *Service) processAgent(ctx context.Context, agent *agentapi.Agent, readyCheck bool) error {
start := time.Now()
slog.Info("Processing agent config", "name", agent.Name, "gen", agent.Generation, "res", agent.ResourceVersion)
Expand Down Expand Up @@ -386,38 +412,46 @@ func (svc *Service) processAgent(ctx context.Context, agent *agentapi.Agent, rea
return errors.Wrapf(alloy.EnsureInstalled(ctx, agent, exporterPort), "failed to ensure alloy installed")
}

func (svc *Service) processAgentFromKube(ctx context.Context, kube client.Client, agent *agentapi.Agent, currentGen *int64) error {
func (svc *Service) processAgentFromKube(ctx context.Context, kube client.Client, agent *agentapi.Agent, currentGen *int64, enforce bool) error {
svc.reg.AgentMetrics.Generation.Set(float64(agent.Generation))

if agent.Generation == *currentGen {
if !enforce && agent.Generation == *currentGen {
return nil
}

start := time.Now()
slog.Info("Agent config changed", "current", *currentGen, "new", agent.Generation)

if agent.Generation != *currentGen {
slog.Info("Agent config changed", "current", *currentGen, "new", agent.Generation)
} else if enforce {
slog.Info("Enforcing agent config", "gen", agent.Generation)
}

if agent.Status.Conditions == nil {
agent.Status.Conditions = []metav1.Condition{}
}
// TODO better handle status condtions
apimeta.SetStatusCondition(&agent.Status.Conditions, metav1.Condition{
Type: "Applied",
Status: metav1.ConditionFalse,
Reason: "ApplyPending",
LastTransitionTime: metav1.Time{Time: time.Now()},
Message: fmt.Sprintf("Config will be applied, gen=%d", agent.Generation),
})

// demonstrating that we're going to try to apply config
agent.Status.LastAttemptGen = agent.Generation
agent.Status.LastAttemptTime = metav1.Time{Time: time.Now()}
if !enforce {
// TODO better handle status condtions
apimeta.SetStatusCondition(&agent.Status.Conditions, metav1.Condition{
Type: "Applied",
Status: metav1.ConditionFalse,
Reason: "ApplyPending",
LastTransitionTime: metav1.Time{Time: time.Now()},
Message: fmt.Sprintf("Config will be applied, gen=%d", agent.Generation),
})

err := kube.Status().Update(ctx, agent)
if err != nil {
return errors.Wrapf(err, "error updating agent last attempt") // TODO gracefully handle case if resourceVersion changed
// demonstrating that we're going to try to apply config
agent.Status.LastAttemptGen = agent.Generation
agent.Status.LastAttemptTime = metav1.Time{Time: time.Now()}

err := kube.Status().Update(ctx, agent)
if err != nil {
return errors.Wrapf(err, "error updating agent last attempt") // TODO gracefully handle case if resourceVersion changed
}
}

err = svc.processActions(ctx, agent)
err := svc.processActions(ctx, agent)
if err != nil {
return errors.Wrap(err, "failed to process agent actions from k8s")
}
Expand Down

0 comments on commit 640d6b0

Please sign in to comment.