Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(parser.prometheusremotewrite, serializer.prometheusremotewrite): Native histogram support end-to-end #16121

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ require (
github.com/gobwas/glob v0.2.3
github.com/godbus/dbus/v5 v5.1.0
github.com/gofrs/uuid/v5 v5.3.0
github.com/gogo/protobuf v1.3.2
github.com/golang-jwt/jwt/v5 v5.2.1
github.com/golang/geo v0.0.0-20190916061304-5b978397cfec
github.com/golang/snappy v0.0.4
Expand Down Expand Up @@ -341,7 +342,6 @@ require (
github.com/goccy/go-json v0.10.3 // indirect
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect
github.com/gofrs/uuid v4.4.0+incompatible // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt/v4 v4.5.0 // indirect
github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // indirect
github.com/golang-sql/sqlexp v0.1.0 // indirect
Expand Down
3 changes: 3 additions & 0 deletions plugins/parsers/prometheusremotewrite/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ additional configuration options for Prometheus Remote Write Samples.

## Data format to consume.
data_format = "prometheusremotewrite"

## Whether to parse a native histogram into one Telegraf metric
keep_native_histograms_atomic = false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please use metric_version similar to what is done in the prometheus parser?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

switching metric_version from prometheus parser seems to have more implications other than affecting how histogram is parsed.

Now, on the prometheusremotewrite side, it seems the current implementation is equivalent to v2. I could emulate a v1 here - where the atomic parsing of both classic and native would be implemented.

```

## Example Input
Expand Down
70 changes: 44 additions & 26 deletions plugins/parsers/prometheusremotewrite/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"

"github.com/gogo/protobuf/proto"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need this instead of the upstream and well maintained github.com/golang/protobuf package?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is what prometheus uses to serialize their protobuf in remote write.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just found out the rationale for them to use this protobuf lib: prometheus/prometheus#14668.
It seem that they do have plan to migrate away from this and to a more maintained lib. So we should probably just do that.

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/parsers"
)

type Parser struct {
DefaultTags map[string]string

KeepNativeHistogramsAtomic bool `toml:"keep_native_histograms_atomic"`
}

func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) {
Expand Down Expand Up @@ -61,42 +64,57 @@ func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) {
}

for _, hp := range ts.Histograms {
h := hp.ToFloatHistogram()

if hp.Timestamp > 0 {
t = time.Unix(0, hp.Timestamp*1000000)
}
if p.KeepNativeHistogramsAtomic {
// If keeping it atomic, we serialize the histogram into one single Telegraf metric
// For now we keep the histogram as a serialized proto
// Another option is to convert it to multi-field Telegraf metric
serialized, err := proto.Marshal(&hp)
if err != nil {
return nil, fmt.Errorf("failed to marshal histogram: %w", err)
}
fields := map[string]any{
metricName: string(serialized),
}
Comment on lines +71 to +80
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You really should go for the multi-field option as you cannot do anything in Telegraf with the serialized format but only can pass this through to the prometheusremotewrite serializer...

Copy link
Author

@Reimirno Reimirno Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Totally agree, I wanted to do that too, but it's actually not straightforward.
Prometheus native histogram protobuf has quite a few fields being array, an even array of struct. There is no good way of representing that in Telegraf metric field (Value data type: Float | Integer | UInteger | String | Boolean)
(Same thing goes with otel exponential histogram)
Maybe I am missing something - could use some advice :)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In particular, the problematic fields are:

  • count (and zero_count): oneof in protobuf
    This exist to hande both inthistogram and floathistogram. We can break up into two fields countInt or countFloat and potentially add a flag isFloatHistogram to achieve information lossless conversion. Or we can convert all to floathistogram, and only store a count float. This is what the current implementation in Telegraf prometheusremotewrite parser is doing.

  • negative_counts negative_deltas etc: int or float array.
    We break down to index-suffixed fields like negative_counts_0=... negative_counts_1=...

  • negative_spans and positive_spans: array of BucketSpan which has two fields offset and length.
    We can break down to index-and-field-suffixed fields like negative_spans_0_offset=xxx negative_spans_0_length=xxx ...


fields := map[string]any{
metricName + "_sum": h.Sum,
}
m := metric.New("prometheus_remote_write", tags, fields, t)
metrics = append(metrics, m)

fields = map[string]any{
metricName + "_count": h.Count,
}
m = metric.New("prometheus_remote_write", tags, fields, t)
metrics = append(metrics, m)
m := metric.New("prometheus_remote_write", tags, fields, t, telegraf.Histogram)
metrics = append(metrics, m)
} else {
h := hp.ToFloatHistogram()

count := 0.0
iter := h.AllBucketIterator()
for iter.Next() {
bucket := iter.At()
fields := map[string]any{
metricName + "_sum": h.Sum,
}
m := metric.New("prometheus_remote_write", tags, fields, t)
metrics = append(metrics, m)

count = count + bucket.Count
fields = map[string]any{
metricName: count,
metricName + "_count": h.Count,
}
m = metric.New("prometheus_remote_write", tags, fields, t)
metrics = append(metrics, m)

localTags := make(map[string]string, len(tags)+1)
localTags[metricName+"_le"] = fmt.Sprintf("%g", bucket.Upper)
for k, v := range tags {
localTags[k] = v
}
count := 0.0
iter := h.AllBucketIterator()
for iter.Next() {
bucket := iter.At()

m := metric.New("prometheus_remote_write", localTags, fields, t)
metrics = append(metrics, m)
count = count + bucket.Count
fields = map[string]any{
metricName: count,
}

localTags := make(map[string]string, len(tags)+1)
localTags[metricName+"_le"] = fmt.Sprintf("%g", bucket.Upper)
for k, v := range tags {
localTags[k] = v
}

m := metric.New("prometheus_remote_write", localTags, fields, t)
metrics = append(metrics, m)
}
}
}
}
Expand Down
61 changes: 55 additions & 6 deletions plugins/serializers/prometheusremotewrite/prometheusremotewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/golang/snappy"
"github.com/prometheus/prometheus/prompb"

"github.com/gogo/protobuf/proto"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/plugins/serializers/prometheus"
Expand Down Expand Up @@ -39,8 +40,8 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
}

var buf bytes.Buffer
var entries = make(map[MetricKey]prompb.TimeSeries)
var labels = make([]prompb.Label, 0)
entries := make(map[MetricKey]prompb.TimeSeries)
labels := make([]prompb.Label, 0)
for _, metric := range metrics {
labels = s.appendCommonLabels(labels[:0], metric)
var metrickey MetricKey
Expand Down Expand Up @@ -134,8 +135,28 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {

metrickey, promts = getPromTS(metricName+"_count", labels, float64(count), metric.Time())
default:
traceAndKeepErr("failed to parse %q: series %q should have `_count`, `_sum` or `_bucket` suffix", metricName, field.Key)
continue
// If all above suffixes are not found, then it is a native histogram
// we should unmarshal the proto message back to golang struct
var h prompb.Histogram

var data []byte
switch v := field.Value.(type) {
case []byte:
data = v
case string:
data = []byte(v)
default:
traceAndKeepErr("unexpected type for field.Value: %T", field.Value)
continue
}

err := proto.Unmarshal(data, &h)
if err != nil {
traceAndKeepErr("failed to unmarshal native histogram %q: %w", metricName, err)
continue
}

metrickey, promts = getPromNativeHistogramTS(metricName, labels, h, metric.Time())
}
case telegraf.Summary:
switch {
Expand Down Expand Up @@ -187,7 +208,13 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
// sample then we can skip over it.
m, ok := entries[metrickey]
if ok {
if metric.Time().Before(time.Unix(0, m.Samples[0].Timestamp*1_000_000)) {
var timestamp int64
if len(m.Samples) > 0 {
timestamp = m.Samples[0].Timestamp
} else {
timestamp = m.Histograms[0].Timestamp
}
if metric.Time().Before(time.Unix(0, timestamp*1_000_000)) {
traceAndKeepErr("metric %q has samples with timestamp %v older than already registered before", metric.Name(), metric.Time())
continue
}
Expand All @@ -202,7 +229,7 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
s.Log.Errorf("some series were dropped, %d series left to send; last recorded error: %v", len(entries), lastErr)
}

var promTS = make([]prompb.TimeSeries, len(entries))
promTS := make([]prompb.TimeSeries, len(entries))
var i int
for _, promts := range entries {
promTS[i] = promts
Expand Down Expand Up @@ -338,6 +365,28 @@ func getPromTS(name string, labels []prompb.Label, value float64, ts time.Time,
return MakeMetricKey(labelscopy), prompb.TimeSeries{Labels: labelscopy, Samples: sample}
}

func getPromNativeHistogramTS(name string, labels []prompb.Label, fh prompb.Histogram,
ts time.Time, extraLabels ...prompb.Label) (MetricKey, prompb.TimeSeries) {
labelscopy := make([]prompb.Label, len(labels), len(labels)+1)
copy(labelscopy, labels)

fh.Timestamp = ts.UnixNano() / int64(time.Millisecond)
histograms := []prompb.Histogram{
fh,
}
labelscopy = append(labelscopy, extraLabels...)
labelscopy = append(labelscopy, prompb.Label{
Name: "__name__",
Value: name,
})

// we sort the labels since Prometheus TSDB does not like out of order labels
sort.Sort(sortableLabels(labelscopy))

// for a native histogram, samples are not used; instead, histograms field is used
return MakeMetricKey(labelscopy), prompb.TimeSeries{Labels: labelscopy, Histograms: histograms}
}

type sortableLabels []prompb.Label

func (sl sortableLabels) Len() int { return len(sl) }
Expand Down
Loading