From 7c9b402ae48ced0e75fc637bad01e551e8b1f6f7 Mon Sep 17 00:00:00 2001 From: Ian Date: Fri, 26 Jan 2024 15:14:41 -0800 Subject: [PATCH] Allowing otel to stdout also for debugging (#656) --- cmd/ktranslate/main.go | 3 + config.go | 2 +- go.mod | 3 +- go.sum | 2 + pkg/formats/format.go | 2 +- pkg/formats/otel/otel.go | 144 +++++++++++++++++++++++---------------- 6 files changed, 95 insertions(+), 61 deletions(-) diff --git a/cmd/ktranslate/main.go b/cmd/ktranslate/main.go index e247aa72..411f9600 100644 --- a/cmd/ktranslate/main.go +++ b/cmd/ktranslate/main.go @@ -435,6 +435,9 @@ func applyFlags(cfg *ktranslate.Config) error { return } cfg.PrometheusFormat.FlowsNeeded = v + // pkg/formats/otel + case "otel.endpoint": + cfg.OtelFormat.Endpoint = val // pkg/formats/influxdb case "influxdb_measurement_prefix": cfg.InfluxDBFormat.MeasurementPrefix = val diff --git a/config.go b/config.go index 18998229..7c2088a9 100644 --- a/config.go +++ b/config.go @@ -368,7 +368,7 @@ func DefaultConfig() *Config { FlowsNeeded: 10, }, OtelFormat: &OtelFormatConfig{ - Endpoint: "", + Endpoint: "stdout", FlowsNeeded: 10, }, InfluxDBFormat: &InfluxDBFormatConfig{ diff --git a/go.mod b/go.mod index 5d2023b4..9609272b 100644 --- a/go.mod +++ b/go.mod @@ -45,6 +45,8 @@ require ( github.com/syndtr/goleveldb v1.0.0 go.opentelemetry.io/otel v1.22.0 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v0.45.0 + go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v0.45.0 + go.opentelemetry.io/otel/metric v1.22.0 go.opentelemetry.io/otel/sdk/metric v1.22.0 go.starlark.net v0.0.0-20220926145019-14b050677505 google.golang.org/genproto v0.0.0-20231002182017-d307bd883b97 @@ -124,7 +126,6 @@ require ( github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect go.mongodb.org/mongo-driver v1.12.1 // indirect go.opencensus.io v0.24.0 // indirect - go.opentelemetry.io/otel/metric v1.22.0 // indirect go.opentelemetry.io/otel/sdk v1.22.0 // indirect go.opentelemetry.io/otel/trace v1.22.0 // indirect go.opentelemetry.io/proto/otlp v1.0.0 // indirect diff --git a/go.sum b/go.sum index ac96221f..94e3535e 100644 --- a/go.sum +++ b/go.sum @@ -462,6 +462,8 @@ go.opentelemetry.io/otel v1.22.0 h1:xS7Ku+7yTFvDfDraDIJVpw7XPyuHlB9MCiqqX5mcJ6Y= go.opentelemetry.io/otel v1.22.0/go.mod h1:eoV4iAi3Ea8LkAEI9+GFT44O6T/D0GWAVFyZVCC6pMI= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v0.45.0 h1:+RbSCde0ERway5FwKvXR3aRJIFeDu9rtwC6E7BC6uoM= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v0.45.0/go.mod h1:zcI8u2EJxbLPyoZ3SkVAAcQPgYb1TDRzW93xLFnsggU= +go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v0.45.0 h1:NjN6zc7Mwy9torqa3mo+pMJ3mHoPI0uzVSYcqB2t72A= +go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v0.45.0/go.mod h1:U+T5v2bk4fCC8XdSEWZja3Pm/ZhvV/zE7JwX/ELJKts= go.opentelemetry.io/otel/metric v1.22.0 h1:lypMQnGyJYeuYPhOM/bgjbFM6WE44W1/T45er4d8Hhg= go.opentelemetry.io/otel/metric v1.22.0/go.mod h1:evJGjVpZv0mQ5QBRJoBF64yMuOf4xCWdXjK8pzFvliY= go.opentelemetry.io/otel/sdk v1.22.0 h1:6coWHw9xw7EfClIC/+O31R8IY3/+EiRFHevmHafB2Gw= diff --git a/pkg/formats/format.go b/pkg/formats/format.go index 4921ce4e..c96905cd 100644 --- a/pkg/formats/format.go +++ b/pkg/formats/format.go @@ -79,7 +79,7 @@ func NewFormat(format Format, log logger.Underlying, registry go_metrics.Registr case FORMAT_PROM_REMOTE: return prom.NewRemoteFormat(log, compression, cfg.PrometheusFormat) case FORMAT_OTEL: - return otel.NewFormat(log, compression, cfg.OtelFormat) + return otel.NewFormat(log, cfg.OtelFormat) default: return nil, fmt.Errorf("You used an unsupported format: %v.", format) } diff --git a/pkg/formats/otel/otel.go b/pkg/formats/otel/otel.go index 0246b99a..18c22d2e 100644 --- a/pkg/formats/otel/otel.go +++ b/pkg/formats/otel/otel.go @@ -2,6 +2,7 @@ package otel import ( "context" + "flag" "strconv" "sync" @@ -14,40 +15,54 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" + "go.opentelemetry.io/otel/exporters/stdout/stdoutmetric" "go.opentelemetry.io/otel/metric" sdkmetric "go.opentelemetry.io/otel/sdk/metric" ) type OtelFormat struct { logger.ContextL - compression kt.Compression - doGz bool lastMetadata map[string]*kt.LastMetadata mux sync.RWMutex - exp *otlpmetrichttp.Exporter - vecTags tagVec - seen map[string]int + exp sdkmetric.Exporter invalids map[string]bool config *ktranslate.OtelFormatConfig - vecs map[string]metric.Float64Counter + vecs map[string]metric.Float64Histogram ctx context.Context } var ( - otelm = otel.Meter("base") + endpoint string + otelm metric.Meter ) -func NewFormat(log logger.Underlying, compression kt.Compression, cfg *ktranslate.OtelFormatConfig) (*OtelFormat, error) { +func init() { + flag.StringVar(&endpoint, "otel.endpoint", "", "Send data to this endpoint or stdout") +} + +func NewFormat(log logger.Underlying, cfg *ktranslate.OtelFormatConfig) (*OtelFormat, error) { jf := &OtelFormat{ - compression: compression, - ContextL: logger.NewContextLFromUnderlying(logger.SContext{S: "nrmFormat"}, log), + ContextL: logger.NewContextLFromUnderlying(logger.SContext{S: "otel"}, log), lastMetadata: map[string]*kt.LastMetadata{}, + invalids: map[string]bool{}, ctx: context.Background(), + vecs: map[string]metric.Float64Histogram{}, + config: cfg, } - exp, err := otlpmetrichttp.New(jf.ctx, otlpmetrichttp.WithEndpoint(cfg.Endpoint)) - if err != nil { - return nil, err + var exp sdkmetric.Exporter + if cfg.Endpoint == "stdout" { + metricExporter, err := stdoutmetric.New() + if err != nil { + return nil, err + } + exp = metricExporter + } else { + metricExporter, err := otlpmetrichttp.New(jf.ctx, otlpmetrichttp.WithEndpoint(cfg.Endpoint)) + if err != nil { + return nil, err + } + exp = metricExporter } meterProvider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(sdkmetric.NewPeriodicReader(exp))) @@ -55,6 +70,7 @@ func NewFormat(log logger.Underlying, compression kt.Compression, cfg *ktranslat jf.exp = exp otelm = otel.Meter("ktranslate") + jf.Infof("Running exporting to %s", cfg.Endpoint) return jf, nil } @@ -73,27 +89,15 @@ func (f *OtelFormat) To(msgs []*kt.JCHF, serBuf []byte) (*kt.Output, error) { defer f.mux.Unlock() for _, m := range res { - if f.seen[m.Name] < f.config.FlowsNeeded { - m.AddTagLabels(f.vecTags) - f.seen[m.Name]++ - if f.seen[m.Name] == f.config.FlowsNeeded { - f.Infof("Seen enough %s!", m.Name) - } else { - f.Infof("Seen %s -> %d", m.Name, f.seen[m.Name]) - } - continue - } - - labels := m.GetTagValues(f.vecTags) if _, ok := f.vecs[m.Name]; !ok { - cv, err := otelm.Float64Counter(m.Name) + cv, err := otelm.Float64Histogram(m.Name) if err != nil { return nil, err } f.vecs[m.Name] = cv - f.Infof("Adding %s %v", m.Name, labels) + f.Infof("Adding %s", m.Name) } - f.vecs[m.Name].Add(f.ctx, m.Value, metric.WithAttributes(labels...)) + f.vecs[m.Name].Record(f.ctx, m.Value, metric.WithAttributeSet(m.GetTagValues())) } return nil, nil @@ -110,6 +114,8 @@ func (f *OtelFormat) Rollup(rolls []rollup.Rollup) (*kt.Output, error) { func (f *OtelFormat) toOtelMetric(in *kt.JCHF) []OtelData { switch in.EventType { + case kt.KENTIK_EVENT_TYPE: + return f.fromKflow(in) case kt.KENTIK_EVENT_SYNTH: return f.fromKSynth(in) case kt.KENTIK_EVENT_SYNTH_GEST: @@ -187,7 +193,7 @@ func (f *OtelFormat) fromKSyngest(in *kt.JCHF) []OtelData { for m, name := range metrics { if in.CustomInt[m] > 0 { ms = append(ms, OtelData{ - Name: "kentik:syngest:" + name.Name, + Name: "kentik.syngest." + name.Name, Value: float64(in.CustomInt[m]), Tags: attr, }) @@ -242,7 +248,7 @@ func (f *OtelFormat) fromKSynth(in *kt.JCHF) []OtelData { switch name.Name { case "avg_rtt", "jit_rtt", "time", "code", "port", "status", "ttlb", "size", "trx_time", "validation", "lost", "sent": ms = append(ms, OtelData{ - Name: "kentik:synth:" + name.Name, + Name: "kentik.synth." + name.Name, Value: float64(in.CustomInt[m]), Tags: attr, }) @@ -252,41 +258,63 @@ func (f *OtelFormat) fromKSynth(in *kt.JCHF) []OtelData { return ms } +func (f *OtelFormat) fromKflow(in *kt.JCHF) []OtelData { + // Map the basic strings into here. + attr := map[string]interface{}{} + metrics := map[string]kt.MetricInfo{"in_bytes": kt.MetricInfo{}, "out_bytes": kt.MetricInfo{}, "in_pkts": kt.MetricInfo{}, "out_pkts": kt.MetricInfo{}, "latency_ms": kt.MetricInfo{}} + f.mux.RLock() + util.SetAttr(attr, in, metrics, f.lastMetadata[in.DeviceName], false) + f.mux.RUnlock() + ms := map[string]int64{} + for m, _ := range metrics { + switch m { + case "in_bytes": + ms[m] = int64(in.InBytes * uint64(in.SampleRate)) + case "out_bytes": + ms[m] = int64(in.OutBytes * uint64(in.SampleRate)) + case "in_pkts": + ms[m] = int64(in.InPkts * uint64(in.SampleRate)) + case "out_pkts": + ms[m] = int64(in.OutPkts * uint64(in.SampleRate)) + case "latency_ms": + ms[m] = int64(in.CustomInt["appl_latency_ms"]) + } + } + + res := []OtelData{} + for k, v := range ms { + if v == 0 { // Drop zero valued metrics here. + continue + } + res = append(res, OtelData{ + Name: "kentik.flow." + k, + Value: float64(v), + Tags: attr, + }) + } + + return res +} + type OtelData struct { Name string Value float64 Tags map[string]interface{} } -func (d *OtelData) AddTagLabels(vecTags tagVec) { - if _, ok := vecTags[d.Name]; !ok { - vecTags[d.Name] = make([]attribute.KeyValue, 0) - } +func (d *OtelData) GetTagValues() attribute.Set { + res := make([]attribute.KeyValue, 0, len(d.Tags)) for k, v := range d.Tags { - found := false - for _, kk := range vecTags[d.Name] { - if string(kk.Key) == k { - found = true - break // Key already exists. - } - } - // If here, new key. - if !found { - switch t := v.(type) { - case string: - vecTags[d.Name] = append(vecTags[d.Name], attribute.String(k, t)) - case int64: - vecTags[d.Name] = append(vecTags[d.Name], attribute.Int64(k, t)) - case float64: - vecTags[d.Name] = append(vecTags[d.Name], attribute.Float64(k, t)) - default: - } + switch t := v.(type) { + case string: + res = append(res, attribute.String(k, t)) + case int64: + res = append(res, attribute.Int64(k, t)) + case float64: + res = append(res, attribute.Float64(k, t)) + default: } } + s, _ := attribute.NewSetWithFiltered(res, func(kv attribute.KeyValue) bool { return true }) + return s } - -func (d *OtelData) GetTagValues(vecTags tagVec) []attribute.KeyValue { - return vecTags[d.Name] -} - -type tagVec map[string][]attribute.KeyValue