Skip to content

Commit

Permalink
feat(outputs.kafka): Option to set producer message timestamp (influx…
Browse files Browse the repository at this point in the history
  • Loading branch information
powersj authored Jul 31, 2024
1 parent b090cf6 commit 2cab6ec
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 8 deletions.
6 changes: 6 additions & 0 deletions plugins/outputs/kafka/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ to use them.
## smaller than the broker's 'message.max.bytes'.
# max_message_bytes = 1000000

## Producer timestamp
## This option sets the timestamp of the kafka producer message, choose from:
## * metric: Uses the metric's timestamp
## * now: Uses the time of write
# producer_timestamp = metric

## Optional TLS Config
# enable_tls = false
# tls_ca = "/etc/telegraf/ca.pem"
Expand Down
25 changes: 17 additions & 8 deletions plugins/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,14 @@ var ValidTopicSuffixMethods = []string{
var zeroTime = time.Unix(0, 0)

type Kafka struct {
Brokers []string `toml:"brokers"`
Topic string `toml:"topic"`
TopicTag string `toml:"topic_tag"`
ExcludeTopicTag bool `toml:"exclude_topic_tag"`
TopicSuffix TopicSuffix `toml:"topic_suffix"`
RoutingTag string `toml:"routing_tag"`
RoutingKey string `toml:"routing_key"`
Brokers []string `toml:"brokers"`
Topic string `toml:"topic"`
TopicTag string `toml:"topic_tag"`
ExcludeTopicTag bool `toml:"exclude_topic_tag"`
TopicSuffix TopicSuffix `toml:"topic_suffix"`
RoutingTag string `toml:"routing_tag"`
RoutingKey string `toml:"routing_key"`
ProducerTimestamp string `toml:"producer_timestamp"`

proxy.Socks5ProxyConfig

Expand Down Expand Up @@ -152,6 +153,14 @@ func (k *Kafka) Init() error {
}
k.saramaConfig = config

switch k.ProducerTimestamp {
case "":
k.ProducerTimestamp = "metric"
case "metric", "now":
default:
return fmt.Errorf("unknown producer_timestamp option: %s", k.ProducerTimestamp)
}

return nil
}

Expand Down Expand Up @@ -207,7 +216,7 @@ func (k *Kafka) Write(metrics []telegraf.Metric) error {
}

// Negative timestamps are not allowed by the Kafka protocol.
if !metric.Time().Before(zeroTime) {
if k.ProducerTimestamp == "metric" && !metric.Time().Before(zeroTime) {
m.Timestamp = metric.Time()
}

Expand Down
6 changes: 6 additions & 0 deletions plugins/outputs/kafka/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@
## smaller than the broker's 'message.max.bytes'.
# max_message_bytes = 1000000

## Producer timestamp
## This option sets the timestamp of the kafka producer message, choose from:
## * metric: Uses the metric's timestamp
## * now: Uses the time of write
# producer_timestamp = metric

## Optional TLS Config
# enable_tls = false
# tls_ca = "/etc/telegraf/ca.pem"
Expand Down

0 comments on commit 2cab6ec

Please sign in to comment.