forked from influxdata/telegraf
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Added NATS server container needed for tests. Added NATS output plug-in. Fixes influxdata#1487 NATS output plug-in use internal.GetTLSConfig to instrument TLS configuration. Added NATS output plug-in to changelog. closes influxdata#1487 closes influxdata#1697
- Loading branch information
Showing
8 changed files
with
206 additions
and
4 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
# NATS Output Plugin | ||
|
||
This plugin writes to a (list of) specified NATS instance(s). | ||
|
||
``` | ||
[[outputs.nats]] | ||
## URLs of NATS servers | ||
servers = ["nats://localhost:4222"] | ||
## Optional credentials | ||
# username = "" | ||
# password = "" | ||
## NATS subject for producer messages | ||
subject = "telegraf" | ||
## Optional TLS Config | ||
## CA certificate used to self-sign NATS server(s) TLS certificate(s) | ||
# tls_ca = "/etc/telegraf/ca.pem" | ||
## Use TLS but skip chain & host verification | ||
# insecure_skip_verify = false | ||
## Data format to output. | ||
## Each data format has it's own unique set of configuration options, read | ||
## more about them here: | ||
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md | ||
data_format = "influx" | ||
``` | ||
|
||
### Required parameters: | ||
|
||
* `servers`: List of strings, this is for NATS clustering support. Each URL should start with `nats://`. | ||
* `subject`: The NATS subject to publish to. | ||
|
||
### Optional parameters: | ||
|
||
* `username`: Username for NATS | ||
* `password`: Password for NATS | ||
* `tls_ca`: TLS CA | ||
* `insecure_skip_verify`: Use SSL but skip chain & host verification (default: false) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,129 @@ | ||
package nats | ||
|
||
import ( | ||
"fmt" | ||
|
||
nats_client "github.com/nats-io/nats" | ||
|
||
"github.com/influxdata/telegraf" | ||
"github.com/influxdata/telegraf/internal" | ||
"github.com/influxdata/telegraf/plugins/outputs" | ||
"github.com/influxdata/telegraf/plugins/serializers" | ||
) | ||
|
||
type NATS struct { | ||
// Servers is the NATS server pool to connect to | ||
Servers []string | ||
// Credentials | ||
Username string | ||
Password string | ||
// NATS subject to publish metrics to | ||
Subject string | ||
|
||
// Path to CA file | ||
CAFile string `toml:"tls_ca"` | ||
|
||
// Skip SSL verification | ||
InsecureSkipVerify bool | ||
|
||
conn *nats_client.Conn | ||
serializer serializers.Serializer | ||
} | ||
|
||
var sampleConfig = ` | ||
## URLs of NATS servers | ||
servers = ["nats://localhost:4222"] | ||
## Optional credentials | ||
# username = "" | ||
# password = "" | ||
## NATS subject for producer messages | ||
subject = "telegraf" | ||
## Optional TLS Config | ||
## CA certificate used to self-sign NATS server(s) TLS certificate(s) | ||
# tls_ca = "/etc/telegraf/ca.pem" | ||
## Use TLS but skip chain & host verification | ||
# insecure_skip_verify = false | ||
## Data format to output. | ||
## Each data format has it's own unique set of configuration options, read | ||
## more about them here: | ||
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md | ||
data_format = "influx" | ||
` | ||
|
||
func (n *NATS) SetSerializer(serializer serializers.Serializer) { | ||
n.serializer = serializer | ||
} | ||
|
||
func (n *NATS) Connect() error { | ||
var err error | ||
// set NATS connection options | ||
opts := nats_client.DefaultOptions | ||
opts.Servers = n.Servers | ||
if n.Username != "" { | ||
opts.User = n.Username | ||
opts.Password = n.Password | ||
} | ||
|
||
// is TLS enabled? | ||
tlsConfig, err := internal.GetTLSConfig( | ||
"", "", n.CAFile, n.InsecureSkipVerify) | ||
if err != nil { | ||
return err | ||
} | ||
if tlsConfig != nil { | ||
// set NATS connection TLS options | ||
opts.Secure = true | ||
opts.TLSConfig = tlsConfig | ||
} | ||
|
||
// try and connect | ||
n.conn, err = opts.Connect() | ||
|
||
return err | ||
} | ||
|
||
func (n *NATS) Close() error { | ||
n.conn.Close() | ||
return nil | ||
} | ||
|
||
func (n *NATS) SampleConfig() string { | ||
return sampleConfig | ||
} | ||
|
||
func (n *NATS) Description() string { | ||
return "Send telegraf measurements to NATS" | ||
} | ||
|
||
func (n *NATS) Write(metrics []telegraf.Metric) error { | ||
if len(metrics) == 0 { | ||
return nil | ||
} | ||
|
||
for _, metric := range metrics { | ||
values, err := n.serializer.Serialize(metric) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
var pubErr error | ||
for _, value := range values { | ||
err = n.conn.Publish(n.Subject, []byte(value)) | ||
if err != nil { | ||
pubErr = err | ||
} | ||
} | ||
|
||
if pubErr != nil { | ||
return fmt.Errorf("FAILED to send NATS message: %s", err) | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
func init() { | ||
outputs.Add("nats", func() telegraf.Output { | ||
return &NATS{} | ||
}) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
package nats | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/influxdata/telegraf/plugins/serializers" | ||
"github.com/influxdata/telegraf/testutil" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestConnectAndWrite(t *testing.T) { | ||
if testing.Short() { | ||
t.Skip("Skipping integration test in short mode") | ||
} | ||
|
||
server := []string{"nats://" + testutil.GetLocalHost() + ":4222"} | ||
s, _ := serializers.NewInfluxSerializer() | ||
n := &NATS{ | ||
Servers: server, | ||
Subject: "telegraf", | ||
serializer: s, | ||
} | ||
|
||
// Verify that we can connect to the NATS daemon | ||
err := n.Connect() | ||
require.NoError(t, err) | ||
|
||
// Verify that we can successfully write data to the NATS daemon | ||
err = n.Write(testutil.MockMetrics()) | ||
require.NoError(t, err) | ||
} |