Skip to content

Commit

Permalink
Enforce switch state every 2 mins
Browse files Browse the repository at this point in the history
  • Loading branch information
Frostman committed Nov 22, 2024
1 parent 751f8f3 commit dac2d84
Showing 1 changed file with 44 additions and 18 deletions.
62 changes: 44 additions & 18 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,35 +185,32 @@ func (svc *Service) Run(ctx context.Context, getClient func() (*gnmi.Client, err

for {
// process currently loaded agent from K8s
err = svc.processAgentFromKube(ctx, kube, agent, &currentGen)
err = svc.processAgentFromKube(ctx, kube, agent, &currentGen, false)
if err != nil {
return errors.Wrap(err, "failed to process agent config from k8s")
}
if err := svc.hearbeat(ctx, kube, agent); err != nil {
return errors.Wrap(err, "failed to heartbeat after processing")
}

select {
case <-ctx.Done():
slog.Info("Context done, exiting")

return nil
case <-time.After(15 * time.Second):
slog.Debug("Sending heartbeat", "name", agent.Name)
start := time.Now()

if err := svc.processor.UpdateSwitchState(ctx, agent, svc.reg); err != nil {
return errors.Wrapf(err, "failed to update switch state")
case <-time.After(2 * time.Minute):
slog.Debug("Enforcing config", "name", agent.Name)
err = svc.processAgentFromKube(ctx, kube, agent, &currentGen, false)
if err != nil {
return errors.Wrap(err, "failed to process agent config from k8s (enforce)")
}
if st := svc.reg.GetSwitchState(); st != nil {
agent.Status.State = *st
if err := svc.hearbeat(ctx, kube, agent); err != nil {
return errors.Wrap(err, "failed to heartbeat after enforce")
}
agent.Status.LastHeartbeat = metav1.Time{Time: time.Now()}

err = kube.Status().Update(ctx, agent)
if err != nil {
return errors.Wrapf(err, "failed to update agent heartbeat") // TODO gracefully handle case if resourceVersion changed
case <-time.After(15 * time.Second):
if err := svc.hearbeat(ctx, kube, agent); err != nil {
return errors.Wrap(err, "failed to heartbeat")
}

svc.reg.AgentMetrics.HeartbeatDuration.Observe(time.Since(start).Seconds())
svc.reg.AgentMetrics.HeartbeatsTotal.Inc()
case event, ok := <-watcher.ResultChan():
// TODO check why channel gets closed
if !ok {
Expand Down Expand Up @@ -273,6 +270,35 @@ func (svc *Service) setInstallAndRunIDs() error {
return nil
}

func (svc *Service) hearbeat(ctx context.Context, kube client.Client, agent *agentapi.Agent) error {
if agent == nil {
slog.Debug("No agent to heartbeat")

return nil
}

slog.Debug("Sending heartbeat", "name", agent.Name)
start := time.Now()

if err := svc.processor.UpdateSwitchState(ctx, agent, svc.reg); err != nil {
return errors.Wrapf(err, "failed to update switch state")
}
if st := svc.reg.GetSwitchState(); st != nil {
agent.Status.State = *st
}
agent.Status.LastHeartbeat = metav1.Time{Time: time.Now()}

err := kube.Status().Update(ctx, agent)
if err != nil {
return errors.Wrapf(err, "failed to update agent heartbeat") // TODO gracefully handle case if resourceVersion changed
}

svc.reg.AgentMetrics.HeartbeatDuration.Observe(time.Since(start).Seconds())
svc.reg.AgentMetrics.HeartbeatsTotal.Inc()

return nil
}

func (svc *Service) processAgent(ctx context.Context, agent *agentapi.Agent, readyCheck bool) error {
start := time.Now()
slog.Info("Processing agent config", "name", agent.Name, "gen", agent.Generation, "res", agent.ResourceVersion)
Expand Down Expand Up @@ -386,7 +412,7 @@ func (svc *Service) processAgent(ctx context.Context, agent *agentapi.Agent, rea
return errors.Wrapf(alloy.EnsureInstalled(ctx, agent, exporterPort), "failed to ensure alloy installed")
}

func (svc *Service) processAgentFromKube(ctx context.Context, kube client.Client, agent *agentapi.Agent, currentGen *int64) error {
func (svc *Service) processAgentFromKube(ctx context.Context, kube client.Client, agent *agentapi.Agent, currentGen *int64, enforce bool) error {

Check failure on line 415 in pkg/agent/agent.go

View workflow job for this annotation

GitHub Actions / test

unused-parameter: parameter 'enforce' seems to be unused, consider removing or renaming it as _ (revive)
svc.reg.AgentMetrics.Generation.Set(float64(agent.Generation))

if agent.Generation == *currentGen {
Expand Down

0 comments on commit dac2d84

Please sign in to comment.