Skip to content

Commit

Permalink
chore: resolve linter issues for confusing-naming and confusing-resul…
Browse files Browse the repository at this point in the history
  • Loading branch information
zak-pawel authored Oct 12, 2022
1 parent 0087a5d commit 85b7590
Show file tree
Hide file tree
Showing 13 changed files with 215 additions and 214 deletions.
12 changes: 6 additions & 6 deletions agent/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (ac *accumulator) AddFields(
tags map[string]string,
t ...time.Time,
) {
ac.addFields(measurement, tags, fields, telegraf.Untyped, t...)
ac.addMeasurement(measurement, tags, fields, telegraf.Untyped, t...)
}

func (ac *accumulator) AddGauge(
Expand All @@ -46,7 +46,7 @@ func (ac *accumulator) AddGauge(
tags map[string]string,
t ...time.Time,
) {
ac.addFields(measurement, tags, fields, telegraf.Gauge, t...)
ac.addMeasurement(measurement, tags, fields, telegraf.Gauge, t...)
}

func (ac *accumulator) AddCounter(
Expand All @@ -55,7 +55,7 @@ func (ac *accumulator) AddCounter(
tags map[string]string,
t ...time.Time,
) {
ac.addFields(measurement, tags, fields, telegraf.Counter, t...)
ac.addMeasurement(measurement, tags, fields, telegraf.Counter, t...)
}

func (ac *accumulator) AddSummary(
Expand All @@ -64,7 +64,7 @@ func (ac *accumulator) AddSummary(
tags map[string]string,
t ...time.Time,
) {
ac.addFields(measurement, tags, fields, telegraf.Summary, t...)
ac.addMeasurement(measurement, tags, fields, telegraf.Summary, t...)
}

func (ac *accumulator) AddHistogram(
Expand All @@ -73,7 +73,7 @@ func (ac *accumulator) AddHistogram(
tags map[string]string,
t ...time.Time,
) {
ac.addFields(measurement, tags, fields, telegraf.Histogram, t...)
ac.addMeasurement(measurement, tags, fields, telegraf.Histogram, t...)
}

func (ac *accumulator) AddMetric(m telegraf.Metric) {
Expand All @@ -83,7 +83,7 @@ func (ac *accumulator) AddMetric(m telegraf.Metric) {
}
}

func (ac *accumulator) addFields(
func (ac *accumulator) addMeasurement(
measurement string,
tags map[string]string,
fields map[string]interface{},
Expand Down
104 changes: 55 additions & 49 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type processorUnit struct {
}

// aggregatorUnit is a group of Aggregators and their source and sink channels.
// Typically the aggregators write to a processor channel and pass the original
// Typically, the aggregators write to a processor channel and pass the original
// metrics to the output channel. The sink channels may be the same channel.

// ┌────────────┐
Expand Down Expand Up @@ -281,55 +281,59 @@ func (a *Agent) runInputs(
) {
var wg sync.WaitGroup
for _, input := range unit.inputs {
// Overwrite agent interval if this plugin has its own.
interval := time.Duration(a.Config.Agent.Interval)
if input.Config.Interval != 0 {
interval = input.Config.Interval
}
a.runInput(ctx, startTime, unit, input, &wg)
}

// Overwrite agent precision if this plugin has its own.
precision := time.Duration(a.Config.Agent.Precision)
if input.Config.Precision != 0 {
precision = input.Config.Precision
}
wg.Wait()

// Overwrite agent collection_jitter if this plugin has its own.
jitter := time.Duration(a.Config.Agent.CollectionJitter)
if input.Config.CollectionJitter != 0 {
jitter = input.Config.CollectionJitter
}
log.Printf("D! [agent] Stopping service inputs")
stopServiceInputs(unit.inputs)

// Overwrite agent collection_offset if this plugin has its own.
offset := time.Duration(a.Config.Agent.CollectionOffset)
if input.Config.CollectionOffset != 0 {
offset = input.Config.CollectionOffset
}
close(unit.dst)
log.Printf("D! [agent] Input channel closed")
}

var ticker Ticker
if a.Config.Agent.RoundInterval {
ticker = NewAlignedTicker(startTime, interval, jitter, offset)
} else {
ticker = NewUnalignedTicker(interval, jitter, offset)
}
defer ticker.Stop()
func (a *Agent) runInput(ctx context.Context, startTime time.Time, unit *inputUnit, input *models.RunningInput, wg *sync.WaitGroup) {
// Overwrite agent interval if this plugin has its own.
interval := time.Duration(a.Config.Agent.Interval)
if input.Config.Interval != 0 {
interval = input.Config.Interval
}

acc := NewAccumulator(input, unit.dst)
acc.SetPrecision(getPrecision(precision, interval))
// Overwrite agent precision if this plugin has its own.
precision := time.Duration(a.Config.Agent.Precision)
if input.Config.Precision != 0 {
precision = input.Config.Precision
}

wg.Add(1)
go func(input *models.RunningInput) {
defer wg.Done()
a.gatherLoop(ctx, acc, input, ticker, interval)
}(input)
// Overwrite agent collection_jitter if this plugin has its own.
jitter := time.Duration(a.Config.Agent.CollectionJitter)
if input.Config.CollectionJitter != 0 {
jitter = input.Config.CollectionJitter
}

wg.Wait()
// Overwrite agent collection_offset if this plugin has its own.
offset := time.Duration(a.Config.Agent.CollectionOffset)
if input.Config.CollectionOffset != 0 {
offset = input.Config.CollectionOffset
}

log.Printf("D! [agent] Stopping service inputs")
stopServiceInputs(unit.inputs)
var ticker Ticker
if a.Config.Agent.RoundInterval {
ticker = NewAlignedTicker(startTime, interval, jitter, offset)
} else {
ticker = NewUnalignedTicker(interval, jitter, offset)
}
defer ticker.Stop()

close(unit.dst)
log.Printf("D! [agent] Input channel closed")
acc := NewAccumulator(input, unit.dst)
acc.SetPrecision(getPrecision(precision, interval))

wg.Add(1)
go func(input *models.RunningInput) {
defer wg.Done()
a.gatherLoop(ctx, acc, input, ticker, interval)
}(input)
}

// testStartInputs is a variation of startInputs for use in --test and --once
Expand Down Expand Up @@ -377,6 +381,7 @@ func (a *Agent) testRunInputs(

nul := make(chan telegraf.Metric)
go func() {
//nolint:revive // empty block needed here
for range nul {
}
}()
Expand Down Expand Up @@ -721,7 +726,7 @@ func (a *Agent) connectOutput(ctx context.Context, output *models.RunningOutput)

err = output.Output.Connect()
if err != nil {
return fmt.Errorf("Error connecting to output %q: %w", output.LogName(), err)
return fmt.Errorf("error connecting to output %q: %w", output.LogName(), err)
}
}
log.Printf("D! [agent] Successfully connected to %s", output.LogName())
Expand Down Expand Up @@ -882,7 +887,7 @@ func (a *Agent) Test(ctx context.Context, wait time.Duration) error {
}
}()

err := a.test(ctx, wait, src)
err := a.runTest(ctx, wait, src)
if err != nil {
return err
}
Expand All @@ -895,10 +900,10 @@ func (a *Agent) Test(ctx context.Context, wait time.Duration) error {
return nil
}

// Test runs the agent and performs a single gather sending output to the
// outputF. After gathering pauses for the wait duration to allow service
// runTest runs the agent and performs a single gather sending output to the
// outputC. After gathering pauses for the wait duration to allow service
// inputs to run.
func (a *Agent) test(ctx context.Context, wait time.Duration, outputC chan<- telegraf.Metric) error {
func (a *Agent) runTest(ctx context.Context, wait time.Duration, outputC chan<- telegraf.Metric) error {
log.Printf("D! [agent] Initializing plugins")
err := a.initPlugins()
if err != nil {
Expand Down Expand Up @@ -971,7 +976,7 @@ func (a *Agent) test(ctx context.Context, wait time.Duration, outputC chan<- tel

// Once runs the full agent for a single gather.
func (a *Agent) Once(ctx context.Context, wait time.Duration) error {
err := a.once(ctx, wait)
err := a.runOnce(ctx, wait)
if err != nil {
return err
}
Expand All @@ -990,10 +995,10 @@ func (a *Agent) Once(ctx context.Context, wait time.Duration) error {
return nil
}

// On runs the agent and performs a single gather sending output to the
// outputF. After gathering pauses for the wait duration to allow service
// runOnce runs the agent and performs a single gather sending output to the
// outputC. After gathering pauses for the wait duration to allow service
// inputs to run.
func (a *Agent) once(ctx context.Context, wait time.Duration) error {
func (a *Agent) runOnce(ctx context.Context, wait time.Duration) error {
log.Printf("D! [agent] Initializing plugins")
err := a.initPlugins()
if err != nil {
Expand Down Expand Up @@ -1094,6 +1099,7 @@ func getPrecision(precision, interval time.Duration) time.Duration {

// panicRecover displays an error if an input panics.
func panicRecover(input *models.RunningInput) {
//nolint:revive // recover is called inside a deferred function
if err := recover(); err != nil {
trace := make([]byte, 2048)
runtime.Stack(trace, true)
Expand Down
Loading

0 comments on commit 85b7590

Please sign in to comment.