Skip to content

Commit

Permalink
Add policy_monitor apm spans
Browse files Browse the repository at this point in the history
  • Loading branch information
michel-laterman committed Jan 30, 2024
1 parent e4d0af3 commit eeb92c1
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 8 deletions.
1 change: 1 addition & 0 deletions internal/pkg/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,7 @@ func (m *simpleMonitorT) Run(ctx context.Context) (err error) {
trans.End()
}
if m.debounceTime > 0 {
m.log.Debug().Dur("debounce_time", m.debounceTime).Msg("monitor debounce start")
// Introduce a debounce time before wait advance (the signal for new docs in the index)
// This is specifically done so we can introduce a delay in for cases like rapid policy changes
// where fleet-server may not have finished dispatching policies to all agents when a new change is detected.
Expand Down
60 changes: 52 additions & 8 deletions internal/pkg/policy/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,13 @@ func NewMonitor(bulker bulk.Bulk, monitor monitor.Monitor, cfg config.ServerLimi
}
}

// endTrans is a convenience function to end the passed transaction if it's not nil
func endTrans(t *apm.Transaction) {
if t != nil {
t.End()
}
}

// Run runs the monitor.
func (m *monitorT) Run(ctx context.Context) error {
m.log = zerolog.Ctx(ctx).With().Str("ctx", "policy agent monitor").Logger()
Expand All @@ -132,21 +139,44 @@ func (m *monitorT) Run(ctx context.Context) error {

close(m.startCh)

var iCtx context.Context
var trans *apm.Transaction
LOOP:
for {
iCtx = ctx
select {
case <-m.kickCh:
if err := m.loadPolicies(ctx); err != nil {
if m.bulker.HasTracer() {
trans = m.bulker.StartTransaction("initial policies", "policy_monitor")
iCtx = apm.ContextWithTransaction(ctx, trans)
}

if err := m.loadPolicies(iCtx); err != nil {
endTrans(trans)
return err
}
m.dispatchPending(ctx)
m.dispatchPending(iCtx)
endTrans(trans)
case <-m.deployCh:
m.dispatchPending(ctx)
if m.bulker.HasTracer() {
trans = m.bulker.StartTransaction("forced policies", "policy_monitor")
iCtx = apm.ContextWithTransaction(ctx, trans)
}

m.dispatchPending(iCtx)
endTrans(trans)
case hits := <-s.Output():
if err := m.processHits(ctx, hits); err != nil {
if m.bulker.HasTracer() {
trans = m.bulker.StartTransaction("output policies", "policy_monitor")
iCtx = apm.ContextWithTransaction(ctx, trans)
}

if err := m.processHits(iCtx, hits); err != nil {
endTrans(trans)
return err
}
m.dispatchPending(ctx)
m.dispatchPending(iCtx)
endTrans(trans)
case <-ctx.Done():
break LOOP
}
Expand All @@ -168,6 +198,9 @@ func unmarshalHits(hits []es.HitT) ([]model.Policy, error) {
}

func (m *monitorT) processHits(ctx context.Context, hits []es.HitT) error {
span, ctx := apm.StartSpan(ctx, "process hits", "process")
defer span.End()

policies, err := unmarshalHits(hits)
if err != nil {
zerolog.Ctx(ctx).Error().Err(err).Msg("fail unmarshal hits")
Expand All @@ -191,6 +224,8 @@ func (m *monitorT) waitStart(ctx context.Context) error {
// dispatchPending will dispatch all pending policy changes to the subscriptions in the queue.
// dispatches are rate limited by the monitor's limiter.
func (m *monitorT) dispatchPending(ctx context.Context) {
span, ctx := apm.StartSpan(ctx, "dispatch pending", "dispatch")
defer span.End()
m.mut.Lock()
defer m.mut.Unlock()

Expand Down Expand Up @@ -244,7 +279,10 @@ func (m *monitorT) dispatchPending(ctx context.Context) {
}

func (m *monitorT) loadPolicies(ctx context.Context) error {
if m.bulker.HasTracer() {
span, ctx := apm.StartSpan(ctx, "Load policies", "load")
defer span.End()

if m.bulker.HasTracer() { // TODO link bulker transaction with the policy_monitor one
trans := m.bulker.StartTransaction("Load policies", "bulker")
ctx = apm.ContextWithTransaction(ctx, trans)
defer trans.End()
Expand Down Expand Up @@ -278,7 +316,7 @@ func (m *monitorT) processPolicies(ctx context.Context, policies []model.Policy)
return err
}

m.updatePolicy(pp)
m.updatePolicy(ctx, pp)
}
return nil
}
Expand All @@ -305,9 +343,15 @@ func (m *monitorT) groupByLatest(policies []model.Policy) map[string]model.Polic
return groupByLatest(policies)
}

func (m *monitorT) updatePolicy(pp *ParsedPolicy) bool {
func (m *monitorT) updatePolicy(ctx context.Context, pp *ParsedPolicy) bool {
newPolicy := pp.Policy

span, _ := apm.StartSpan(ctx, "update policy", "process")
span.Context.SetLabel(logger.PolicyID, newPolicy.PolicyID)
span.Context.SetLabel("revision_idx", newPolicy.RevisionIdx)
span.Context.SetLabel("coordinator_idx", newPolicy.CoordinatorIdx)
defer span.End()

zlog := m.log.With().
Str(logger.PolicyID, newPolicy.PolicyID).
Int64("revision_idx", newPolicy.RevisionIdx).
Expand Down

0 comments on commit eeb92c1

Please sign in to comment.