Skip to content

Commit

Permalink
fix(agent): Respect processor order in file (influxdata#13614)
Browse files Browse the repository at this point in the history
  • Loading branch information
srebhan authored Jul 14, 2023
1 parent de8a9c5 commit a72b859
Show file tree
Hide file tree
Showing 14 changed files with 219 additions and 7 deletions.
15 changes: 8 additions & 7 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"log"
"os"
"runtime"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -603,14 +602,16 @@ func (a *Agent) startProcessors(
dst chan<- telegraf.Metric,
processors models.RunningProcessors,
) (chan<- telegraf.Metric, []*processorUnit, error) {
// Sort from last to first
sort.SliceStable(processors, func(i, j int) bool {
return processors[i].Config.Order > processors[j].Config.Order
})

var src chan telegraf.Metric
units := make([]*processorUnit, 0, len(processors))
for _, processor := range processors {
// The processor chain is constructed from the output side starting from
// the output(s) and walking the way back to the input(s). However, the
// processor-list is sorted by order and/or by appearance in the config,
// i.e. in input-to-output direction. Therefore, reverse the processor list
// to reflect the order/definition order in the processing chain.
for i := len(processors) - 1; i >= 0; i-- {
processor := processors[i]

src = make(chan telegraf.Metric, 100)
acc := NewAccumulator(processor, dst)

Expand Down
95 changes: 95 additions & 0 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,26 @@
package agent

import (
"context"
"fmt"
"os"
"path/filepath"
"sync"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/models"
_ "github.com/influxdata/telegraf/plugins/aggregators/all"
_ "github.com/influxdata/telegraf/plugins/inputs/all"
_ "github.com/influxdata/telegraf/plugins/outputs/all"
"github.com/influxdata/telegraf/plugins/parsers/influx"
_ "github.com/influxdata/telegraf/plugins/processors/all"
"github.com/influxdata/telegraf/testutil"
)

func TestAgent_OmitHostname(t *testing.T) {
Expand Down Expand Up @@ -165,3 +177,86 @@ func TestWindow(t *testing.T) {
})
}
}

func TestCases(t *testing.T) {
// Get all directories in testcases
folders, err := os.ReadDir("testcases")
require.NoError(t, err)

// Make sure tests contains data
require.NotEmpty(t, folders)

for _, f := range folders {
// Only handle folders
if !f.IsDir() {
continue
}

fname := f.Name()
testdataPath := filepath.Join("testcases", fname)
configFilename := filepath.Join(testdataPath, "telegraf.conf")
expectedFilename := filepath.Join(testdataPath, "expected.out")

t.Run(fname, func(t *testing.T) {
// Get parser to parse input and expected output
parser := &influx.Parser{}
require.NoError(t, parser.Init())

expected, err := testutil.ParseMetricsFromFile(expectedFilename, parser)
require.NoError(t, err)
require.NotEmpty(t, expected)

// Load the config and inject the mock output to be able to verify
// the resulting metrics
cfg := config.NewConfig()
require.NoError(t, cfg.LoadAll(configFilename))
require.Empty(t, cfg.Outputs, "No output(s) allowed in the config!")

// Setup the agent and run the agent in "once" mode
agent := NewAgent(cfg)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
actual, err := collect(ctx, agent, 0)
require.NoError(t, err)

// Process expected metrics and compare with resulting metrics
options := []cmp.Option{
testutil.IgnoreTags("host"),
}
if expected[0].Time().IsZero() {
options = append(options, testutil.IgnoreTime())
}
testutil.RequireMetricsEqual(t, expected, actual, options...)
})
}
}

// Implement a "test-mode" like call but collect the metrics
func collect(ctx context.Context, a *Agent, wait time.Duration) ([]telegraf.Metric, error) {
var received []telegraf.Metric
var mu sync.Mutex

src := make(chan telegraf.Metric, 100)

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for m := range src {
mu.Lock()
received = append(received, m)
mu.Unlock()
m.Reject()
}
}()

if err := a.runTest(ctx, wait, src); err != nil {
return nil, err
}
wg.Wait()

if models.GlobalGatherErrors.Get() != 0 {
return received, fmt.Errorf("input plugins recorded %d errors", models.GlobalGatherErrors.Get())
}
return received, nil
}
2 changes: 2 additions & 0 deletions agent/testcases/processor-order-appearance/expected.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
new_metric_from_starlark,foo=bar baz=42i,timestamp="2023-07-13T12:53:54.197709713Z" 1689252834197709713
old_metric_from_mock,mood=good value=23i,timestamp="2023-07-13T13:10:34Z" 1689253834000000000
1 change: 1 addition & 0 deletions agent/testcases/processor-order-appearance/input.influx
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
old_metric_from_mock,mood=good value=23i 1689253834000000000
26 changes: 26 additions & 0 deletions agent/testcases/processor-order-appearance/telegraf.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Test for using the appearance order in the file for processor order
[[inputs.file]]
files = ["testcases/processor-order-appearance/input.influx"]
data_format = "influx"

[[processors.starlark]]
source = '''
def apply(metric):
metrics = []

m = Metric("new_metric_from_starlark")
m.tags["foo"] = "bar"
m.fields["baz"] = 42
m.time = 1689252834197709713
metrics.append(m)
metrics.append(metric)

return metrics
'''

[[processors.date]]
field_key = "timestamp"
date_format = "2006-01-02T15:04:05.999999999Z"
timezone = "UTC"


2 changes: 2 additions & 0 deletions agent/testcases/processor-order-explicit/expected.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
new_metric_from_starlark,foo=bar baz=42i,timestamp="2023-07-13T12:53:54.197709713Z" 1689252834197709713
old_metric_from_mock,mood=good value=23i,timestamp="2023-07-13T13:10:34Z" 1689253834000000000
1 change: 1 addition & 0 deletions agent/testcases/processor-order-explicit/input.influx
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
old_metric_from_mock,mood=good value=23i 1689253834000000000
27 changes: 27 additions & 0 deletions agent/testcases/processor-order-explicit/telegraf.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Test for specifying an explicit processor order
[[inputs.file]]
files = ["testcases/processor-order-explicit/input.influx"]
data_format = "influx"


[[processors.date]]
field_key = "timestamp"
date_format = "2006-01-02T15:04:05.999999999Z"
timezone = "UTC"
order = 2

[[processors.starlark]]
source = '''
def apply(metric):
metrics = []

m = Metric("new_metric_from_starlark")
m.tags["foo"] = "bar"
m.fields["baz"] = 42
m.time = 1689252834197709713
metrics.append(m)
metrics.append(metric)

return metrics
'''
order = 1
2 changes: 2 additions & 0 deletions agent/testcases/processor-order-mixed/expected.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
new_metric_from_starlark,foo=bar baz=42i,timestamp="2023-07-13T12:53:54.197709713Z" 1689252834197709713
old_metric_from_mock,mood=good value=23i,timestamp="2023-07-13T13:10:34Z" 1689253834000000000
1 change: 1 addition & 0 deletions agent/testcases/processor-order-mixed/input.influx
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
old_metric_from_mock,mood=good value=23i 1689253834000000000
25 changes: 25 additions & 0 deletions agent/testcases/processor-order-mixed/telegraf.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Test for using the appearance order in the file for processor order
[[inputs.file]]
files = ["testcases/processor-order-appearance/input.influx"]
data_format = "influx"

[[processors.starlark]]
source = '''
def apply(metric):
metrics = []

m = Metric("new_metric_from_starlark")
m.tags["foo"] = "bar"
m.fields["baz"] = 42
m.time = 1689252834197709713
metrics.append(m)
metrics.append(metric)

return metrics
'''

[[processors.date]]
field_key = "timestamp"
date_format = "2006-01-02T15:04:05.999999999Z"
timezone = "UTC"
order = 1
2 changes: 2 additions & 0 deletions agent/testcases/processor-order-no-starlark/expected.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
new_metric_from_starlark,foo=bar baz=42i 1689252834197709713
old_metric_from_mock,mood=good value=23i,timestamp="2023-07-13T13:10:34Z" 1689253834000000000
1 change: 1 addition & 0 deletions agent/testcases/processor-order-no-starlark/input.influx
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
old_metric_from_mock,mood=good value=23i 1689253834000000000
26 changes: 26 additions & 0 deletions agent/testcases/processor-order-no-starlark/telegraf.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Test for using the appearance order in the file for processor order.
# This will not add the "timestamp" field as the starlark processor runs _after_
# the date processor.
[[inputs.file]]
files = ["testcases/processor-order-no-starlark/input.influx"]
data_format = "influx"

[[processors.date]]
field_key = "timestamp"
date_format = "2006-01-02T15:04:05.999999999Z"
timezone = "UTC"

[[processors.starlark]]
source = '''
def apply(metric):
metrics = []

m = Metric("new_metric_from_starlark")
m.tags["foo"] = "bar"
m.fields["baz"] = 42
m.time = 1689252834197709713
metrics.append(m)
metrics.append(metric)

return metrics
'''

0 comments on commit a72b859

Please sign in to comment.