Skip to content

Commit

Permalink
Revive fixes - part 2 (influxdata#8835)
Browse files Browse the repository at this point in the history
* Revive fixes regarding following set of rules:
[rule.if-return]
[rule.increment-decrement]
[rule.var-declaration]
[rule.package-comments]
[rule.receiver-naming]
[rule.unexported-return]
  • Loading branch information
zak-pawel authored Feb 16, 2021
1 parent 5606a95 commit d9736d5
Show file tree
Hide file tree
Showing 81 changed files with 732 additions and 861 deletions.
4 changes: 1 addition & 3 deletions agent/tick_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (
"github.com/stretchr/testify/require"
)

var format = "2006-01-02T15:04:05.999Z07:00"

func TestAlignedTicker(t *testing.T) {
interval := 10 * time.Second
jitter := 0 * time.Second
Expand Down Expand Up @@ -249,7 +247,7 @@ func simulatedDist(ticker Ticker, clock *clock.Mock) Distribution {
for !clock.Now().After(until) {
select {
case tm := <-ticker.Elapsed():
dist.Buckets[tm.Second()] += 1
dist.Buckets[tm.Second()]++
dist.Count++
dist.Waittime += tm.Sub(last).Seconds()
last = tm
Expand Down
11 changes: 2 additions & 9 deletions internal/rotate/file_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,7 @@ func (w *FileWriter) openCurrent() (err error) {
w.bytesWritten = fileInfo.Size()
}

if err = w.rotateIfNeeded(); err != nil {
return err
}
return nil
return w.rotateIfNeeded()
}

func (w *FileWriter) rotateIfNeeded() error {
Expand All @@ -153,11 +150,7 @@ func (w *FileWriter) rotate() (err error) {
return err
}

if err = w.purgeArchivesIfNeeded(); err != nil {
return err
}

return nil
return w.purgeArchivesIfNeeded()
}

func (w *FileWriter) purgeArchivesIfNeeded() (err error) {
Expand Down
20 changes: 10 additions & 10 deletions internal/snmp/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,27 @@ type GosnmpWrapper struct {
}

// Host returns the value of GoSNMP.Target.
func (gsw GosnmpWrapper) Host() string {
return gsw.Target
func (gs GosnmpWrapper) Host() string {
return gs.Target
}

// Walk wraps GoSNMP.Walk() or GoSNMP.BulkWalk(), depending on whether the
// connection is using SNMPv1 or newer.
// Also, if any error is encountered, it will just once reconnect and try again.
func (gsw GosnmpWrapper) Walk(oid string, fn gosnmp.WalkFunc) error {
func (gs GosnmpWrapper) Walk(oid string, fn gosnmp.WalkFunc) error {
var err error
// On error, retry once.
// Unfortunately we can't distinguish between an error returned by gosnmp, and one returned by the walk function.
for i := 0; i < 2; i++ {
if gsw.Version == gosnmp.Version1 {
err = gsw.GoSNMP.Walk(oid, fn)
if gs.Version == gosnmp.Version1 {
err = gs.GoSNMP.Walk(oid, fn)
} else {
err = gsw.GoSNMP.BulkWalk(oid, fn)
err = gs.GoSNMP.BulkWalk(oid, fn)
}
if err == nil {
return nil
}
if err := gsw.GoSNMP.Connect(); err != nil {
if err := gs.GoSNMP.Connect(); err != nil {
return fmt.Errorf("reconnecting: %w", err)
}
}
Expand All @@ -44,15 +44,15 @@ func (gsw GosnmpWrapper) Walk(oid string, fn gosnmp.WalkFunc) error {

// Get wraps GoSNMP.GET().
// If any error is encountered, it will just once reconnect and try again.
func (gsw GosnmpWrapper) Get(oids []string) (*gosnmp.SnmpPacket, error) {
func (gs GosnmpWrapper) Get(oids []string) (*gosnmp.SnmpPacket, error) {
var err error
var pkt *gosnmp.SnmpPacket
for i := 0; i < 2; i++ {
pkt, err = gsw.GoSNMP.Get(oids)
pkt, err = gs.GoSNMP.Get(oids)
if err == nil {
return pkt, nil
}
if err := gsw.GoSNMP.Connect(); err != nil {
if err := gs.GoSNMP.Connect(); err != nil {
return nil, fmt.Errorf("reconnecting: %w", err)
}
}
Expand Down
86 changes: 43 additions & 43 deletions models/running_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ import (

const (
// Default size of metrics batch size.
DEFAULT_METRIC_BATCH_SIZE = 1000
DefaultMetricBatchSize = 1000

// Default number of metrics kept. It should be a multiple of batch size.
DEFAULT_METRIC_BUFFER_LIMIT = 10000
DefaultMetricBufferLimit = 10000
)

// OutputConfig containing name and filter
Expand Down Expand Up @@ -78,13 +78,13 @@ func NewRunningOutput(
bufferLimit = config.MetricBufferLimit
}
if bufferLimit == 0 {
bufferLimit = DEFAULT_METRIC_BUFFER_LIMIT
bufferLimit = DefaultMetricBufferLimit
}
if config.MetricBatchSize > 0 {
batchSize = config.MetricBatchSize
}
if batchSize == 0 {
batchSize = DEFAULT_METRIC_BATCH_SIZE
batchSize = DefaultMetricBatchSize
}

ro := &RunningOutput{
Expand Down Expand Up @@ -114,8 +114,8 @@ func (r *RunningOutput) LogName() string {
return logName("outputs", r.Config.Name, r.Config.Alias)
}

func (ro *RunningOutput) metricFiltered(metric telegraf.Metric) {
ro.MetricsFiltered.Incr(1)
func (r *RunningOutput) metricFiltered(metric telegraf.Metric) {
r.MetricsFiltered.Incr(1)
metric.Drop()
}

Expand All @@ -133,96 +133,96 @@ func (r *RunningOutput) Init() error {
// AddMetric adds a metric to the output.
//
// Takes ownership of metric
func (ro *RunningOutput) AddMetric(metric telegraf.Metric) {
if ok := ro.Config.Filter.Select(metric); !ok {
ro.metricFiltered(metric)
func (r *RunningOutput) AddMetric(metric telegraf.Metric) {
if ok := r.Config.Filter.Select(metric); !ok {
r.metricFiltered(metric)
return
}

ro.Config.Filter.Modify(metric)
r.Config.Filter.Modify(metric)
if len(metric.FieldList()) == 0 {
ro.metricFiltered(metric)
r.metricFiltered(metric)
return
}

if output, ok := ro.Output.(telegraf.AggregatingOutput); ok {
ro.aggMutex.Lock()
if output, ok := r.Output.(telegraf.AggregatingOutput); ok {
r.aggMutex.Lock()
output.Add(metric)
ro.aggMutex.Unlock()
r.aggMutex.Unlock()
return
}

if len(ro.Config.NameOverride) > 0 {
metric.SetName(ro.Config.NameOverride)
if len(r.Config.NameOverride) > 0 {
metric.SetName(r.Config.NameOverride)
}

if len(ro.Config.NamePrefix) > 0 {
metric.AddPrefix(ro.Config.NamePrefix)
if len(r.Config.NamePrefix) > 0 {
metric.AddPrefix(r.Config.NamePrefix)
}

if len(ro.Config.NameSuffix) > 0 {
metric.AddSuffix(ro.Config.NameSuffix)
if len(r.Config.NameSuffix) > 0 {
metric.AddSuffix(r.Config.NameSuffix)
}

dropped := ro.buffer.Add(metric)
atomic.AddInt64(&ro.droppedMetrics, int64(dropped))
dropped := r.buffer.Add(metric)
atomic.AddInt64(&r.droppedMetrics, int64(dropped))

count := atomic.AddInt64(&ro.newMetricsCount, 1)
if count == int64(ro.MetricBatchSize) {
atomic.StoreInt64(&ro.newMetricsCount, 0)
count := atomic.AddInt64(&r.newMetricsCount, 1)
if count == int64(r.MetricBatchSize) {
atomic.StoreInt64(&r.newMetricsCount, 0)
select {
case ro.BatchReady <- time.Now():
case r.BatchReady <- time.Now():
default:
}
}
}

// Write writes all metrics to the output, stopping when all have been sent on
// or error.
func (ro *RunningOutput) Write() error {
if output, ok := ro.Output.(telegraf.AggregatingOutput); ok {
ro.aggMutex.Lock()
func (r *RunningOutput) Write() error {
if output, ok := r.Output.(telegraf.AggregatingOutput); ok {
r.aggMutex.Lock()
metrics := output.Push()
ro.buffer.Add(metrics...)
r.buffer.Add(metrics...)
output.Reset()
ro.aggMutex.Unlock()
r.aggMutex.Unlock()
}

atomic.StoreInt64(&ro.newMetricsCount, 0)
atomic.StoreInt64(&r.newMetricsCount, 0)

// Only process the metrics in the buffer now. Metrics added while we are
// writing will be sent on the next call.
nBuffer := ro.buffer.Len()
nBatches := nBuffer/ro.MetricBatchSize + 1
nBuffer := r.buffer.Len()
nBatches := nBuffer/r.MetricBatchSize + 1
for i := 0; i < nBatches; i++ {
batch := ro.buffer.Batch(ro.MetricBatchSize)
batch := r.buffer.Batch(r.MetricBatchSize)
if len(batch) == 0 {
break
}

err := ro.write(batch)
err := r.write(batch)
if err != nil {
ro.buffer.Reject(batch)
r.buffer.Reject(batch)
return err
}
ro.buffer.Accept(batch)
r.buffer.Accept(batch)
}
return nil
}

// WriteBatch writes a single batch of metrics to the output.
func (ro *RunningOutput) WriteBatch() error {
batch := ro.buffer.Batch(ro.MetricBatchSize)
func (r *RunningOutput) WriteBatch() error {
batch := r.buffer.Batch(r.MetricBatchSize)
if len(batch) == 0 {
return nil
}

err := ro.write(batch)
err := r.write(batch)
if err != nil {
ro.buffer.Reject(batch)
r.buffer.Reject(batch)
return err
}
ro.buffer.Accept(batch)
r.buffer.Accept(batch)

return nil
}
Expand Down
32 changes: 16 additions & 16 deletions models/running_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ func (rp *RunningProcessor) metricFiltered(metric telegraf.Metric) {
metric.Drop()
}

func (r *RunningProcessor) Init() error {
if p, ok := r.Processor.(telegraf.Initializer); ok {
func (rp *RunningProcessor) Init() error {
if p, ok := rp.Processor.(telegraf.Initializer); ok {
err := p.Init()
if err != nil {
return err
Expand All @@ -62,39 +62,39 @@ func (r *RunningProcessor) Init() error {
return nil
}

func (r *RunningProcessor) Log() telegraf.Logger {
return r.log
func (rp *RunningProcessor) Log() telegraf.Logger {
return rp.log
}

func (r *RunningProcessor) LogName() string {
return logName("processors", r.Config.Name, r.Config.Alias)
func (rp *RunningProcessor) LogName() string {
return logName("processors", rp.Config.Name, rp.Config.Alias)
}

func (r *RunningProcessor) MakeMetric(metric telegraf.Metric) telegraf.Metric {
func (rp *RunningProcessor) MakeMetric(metric telegraf.Metric) telegraf.Metric {
return metric
}

func (r *RunningProcessor) Start(acc telegraf.Accumulator) error {
return r.Processor.Start(acc)
func (rp *RunningProcessor) Start(acc telegraf.Accumulator) error {
return rp.Processor.Start(acc)
}

func (r *RunningProcessor) Add(m telegraf.Metric, acc telegraf.Accumulator) error {
if ok := r.Config.Filter.Select(m); !ok {
func (rp *RunningProcessor) Add(m telegraf.Metric, acc telegraf.Accumulator) error {
if ok := rp.Config.Filter.Select(m); !ok {
// pass downstream
acc.AddMetric(m)
return nil
}

r.Config.Filter.Modify(m)
rp.Config.Filter.Modify(m)
if len(m.FieldList()) == 0 {
// drop metric
r.metricFiltered(m)
rp.metricFiltered(m)
return nil
}

return r.Processor.Add(m, acc)
return rp.Processor.Add(m, acc)
}

func (r *RunningProcessor) Stop() {
r.Processor.Stop()
func (rp *RunningProcessor) Stop() {
rp.Processor.Stop()
}
6 changes: 1 addition & 5 deletions plugins/common/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,5 @@ func (k *Config) SetConfig(config *sarama.Config) error {
config.Net.TLS.Enable = true
}

if err := k.SetSASLConfig(config); err != nil {
return err
}

return nil
return k.SetSASLConfig(config)
}
6 changes: 3 additions & 3 deletions plugins/common/shim/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ func (s *Shim) LoadConfig(filePath *string) error {
}
if conf.Input != nil {
if err = s.AddInput(conf.Input); err != nil {
return fmt.Errorf("Failed to add Input: %w", err)
return fmt.Errorf("failed to add Input: %w", err)
}
} else if conf.Processor != nil {
if err = s.AddStreamingProcessor(conf.Processor); err != nil {
return fmt.Errorf("Failed to add Processor: %w", err)
return fmt.Errorf("failed to add Processor: %w", err)
}
} else if conf.Output != nil {
if err = s.AddOutput(conf.Output); err != nil {
return fmt.Errorf("Failed to add Output: %w", err)
return fmt.Errorf("failed to add Output: %w", err)
}
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion plugins/inputs/aerospike/aerospike_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestSelectNamepsacesIntegration(t *testing.T) {
count := 0
for _, p := range acc.Metrics {
if p.Measurement == "aerospike_namespace" {
count += 1
count++
}
}
assert.Equal(t, count, 1)
Expand Down
6 changes: 2 additions & 4 deletions plugins/inputs/bond/bond.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,8 @@ func (bond *Bond) gatherSlavePart(bondName string, rawFile string, acc telegraf.
acc.AddFields("bond_slave", fields, tags)
}
}
if err := scanner.Err(); err != nil {
return err
}
return nil

return scanner.Err()
}

// loadPath can be used to read path firstly from config
Expand Down
Loading

0 comments on commit d9736d5

Please sign in to comment.