Skip to content

Commit

Permalink
Allowing otel to stdout also for debugging (#656)
Browse files Browse the repository at this point in the history
  • Loading branch information
i3149 authored Jan 26, 2024
1 parent 2750e5f commit 7c9b402
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 61 deletions.
3 changes: 3 additions & 0 deletions cmd/ktranslate/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,9 @@ func applyFlags(cfg *ktranslate.Config) error {
return
}
cfg.PrometheusFormat.FlowsNeeded = v
// pkg/formats/otel
case "otel.endpoint":
cfg.OtelFormat.Endpoint = val
// pkg/formats/influxdb
case "influxdb_measurement_prefix":
cfg.InfluxDBFormat.MeasurementPrefix = val
Expand Down
2 changes: 1 addition & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ func DefaultConfig() *Config {
FlowsNeeded: 10,
},
OtelFormat: &OtelFormatConfig{
Endpoint: "",
Endpoint: "stdout",
FlowsNeeded: 10,
},
InfluxDBFormat: &InfluxDBFormatConfig{
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ require (
github.com/syndtr/goleveldb v1.0.0
go.opentelemetry.io/otel v1.22.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v0.45.0
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v0.45.0
go.opentelemetry.io/otel/metric v1.22.0
go.opentelemetry.io/otel/sdk/metric v1.22.0
go.starlark.net v0.0.0-20220926145019-14b050677505
google.golang.org/genproto v0.0.0-20231002182017-d307bd883b97
Expand Down Expand Up @@ -124,7 +126,6 @@ require (
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
go.mongodb.org/mongo-driver v1.12.1 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/otel/metric v1.22.0 // indirect
go.opentelemetry.io/otel/sdk v1.22.0 // indirect
go.opentelemetry.io/otel/trace v1.22.0 // indirect
go.opentelemetry.io/proto/otlp v1.0.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,8 @@ go.opentelemetry.io/otel v1.22.0 h1:xS7Ku+7yTFvDfDraDIJVpw7XPyuHlB9MCiqqX5mcJ6Y=
go.opentelemetry.io/otel v1.22.0/go.mod h1:eoV4iAi3Ea8LkAEI9+GFT44O6T/D0GWAVFyZVCC6pMI=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v0.45.0 h1:+RbSCde0ERway5FwKvXR3aRJIFeDu9rtwC6E7BC6uoM=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v0.45.0/go.mod h1:zcI8u2EJxbLPyoZ3SkVAAcQPgYb1TDRzW93xLFnsggU=
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v0.45.0 h1:NjN6zc7Mwy9torqa3mo+pMJ3mHoPI0uzVSYcqB2t72A=
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v0.45.0/go.mod h1:U+T5v2bk4fCC8XdSEWZja3Pm/ZhvV/zE7JwX/ELJKts=
go.opentelemetry.io/otel/metric v1.22.0 h1:lypMQnGyJYeuYPhOM/bgjbFM6WE44W1/T45er4d8Hhg=
go.opentelemetry.io/otel/metric v1.22.0/go.mod h1:evJGjVpZv0mQ5QBRJoBF64yMuOf4xCWdXjK8pzFvliY=
go.opentelemetry.io/otel/sdk v1.22.0 h1:6coWHw9xw7EfClIC/+O31R8IY3/+EiRFHevmHafB2Gw=
Expand Down
2 changes: 1 addition & 1 deletion pkg/formats/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func NewFormat(format Format, log logger.Underlying, registry go_metrics.Registr
case FORMAT_PROM_REMOTE:
return prom.NewRemoteFormat(log, compression, cfg.PrometheusFormat)
case FORMAT_OTEL:
return otel.NewFormat(log, compression, cfg.OtelFormat)
return otel.NewFormat(log, cfg.OtelFormat)
default:
return nil, fmt.Errorf("You used an unsupported format: %v.", format)
}
Expand Down
144 changes: 86 additions & 58 deletions pkg/formats/otel/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package otel

import (
"context"
"flag"
"strconv"
"sync"

Expand All @@ -14,47 +15,62 @@ import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
"go.opentelemetry.io/otel/exporters/stdout/stdoutmetric"
"go.opentelemetry.io/otel/metric"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
)

type OtelFormat struct {
logger.ContextL
compression kt.Compression
doGz bool
lastMetadata map[string]*kt.LastMetadata
mux sync.RWMutex
exp *otlpmetrichttp.Exporter
vecTags tagVec
seen map[string]int
exp sdkmetric.Exporter
invalids map[string]bool
config *ktranslate.OtelFormatConfig
vecs map[string]metric.Float64Counter
vecs map[string]metric.Float64Histogram
ctx context.Context
}

var (
otelm = otel.Meter("base")
endpoint string
otelm metric.Meter
)

func NewFormat(log logger.Underlying, compression kt.Compression, cfg *ktranslate.OtelFormatConfig) (*OtelFormat, error) {
func init() {
flag.StringVar(&endpoint, "otel.endpoint", "", "Send data to this endpoint or stdout")
}

func NewFormat(log logger.Underlying, cfg *ktranslate.OtelFormatConfig) (*OtelFormat, error) {
jf := &OtelFormat{
compression: compression,
ContextL: logger.NewContextLFromUnderlying(logger.SContext{S: "nrmFormat"}, log),
ContextL: logger.NewContextLFromUnderlying(logger.SContext{S: "otel"}, log),
lastMetadata: map[string]*kt.LastMetadata{},
invalids: map[string]bool{},
ctx: context.Background(),
vecs: map[string]metric.Float64Histogram{},
config: cfg,
}

exp, err := otlpmetrichttp.New(jf.ctx, otlpmetrichttp.WithEndpoint(cfg.Endpoint))
if err != nil {
return nil, err
var exp sdkmetric.Exporter
if cfg.Endpoint == "stdout" {
metricExporter, err := stdoutmetric.New()
if err != nil {
return nil, err
}
exp = metricExporter
} else {
metricExporter, err := otlpmetrichttp.New(jf.ctx, otlpmetrichttp.WithEndpoint(cfg.Endpoint))
if err != nil {
return nil, err
}
exp = metricExporter
}

meterProvider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(sdkmetric.NewPeriodicReader(exp)))
otel.SetMeterProvider(meterProvider)
jf.exp = exp

otelm = otel.Meter("ktranslate")
jf.Infof("Running exporting to %s", cfg.Endpoint)

return jf, nil
}
Expand All @@ -73,27 +89,15 @@ func (f *OtelFormat) To(msgs []*kt.JCHF, serBuf []byte) (*kt.Output, error) {
defer f.mux.Unlock()

for _, m := range res {
if f.seen[m.Name] < f.config.FlowsNeeded {
m.AddTagLabels(f.vecTags)
f.seen[m.Name]++
if f.seen[m.Name] == f.config.FlowsNeeded {
f.Infof("Seen enough %s!", m.Name)
} else {
f.Infof("Seen %s -> %d", m.Name, f.seen[m.Name])
}
continue
}

labels := m.GetTagValues(f.vecTags)
if _, ok := f.vecs[m.Name]; !ok {
cv, err := otelm.Float64Counter(m.Name)
cv, err := otelm.Float64Histogram(m.Name)
if err != nil {
return nil, err
}
f.vecs[m.Name] = cv
f.Infof("Adding %s %v", m.Name, labels)
f.Infof("Adding %s", m.Name)
}
f.vecs[m.Name].Add(f.ctx, m.Value, metric.WithAttributes(labels...))
f.vecs[m.Name].Record(f.ctx, m.Value, metric.WithAttributeSet(m.GetTagValues()))
}

return nil, nil
Expand All @@ -110,6 +114,8 @@ func (f *OtelFormat) Rollup(rolls []rollup.Rollup) (*kt.Output, error) {

func (f *OtelFormat) toOtelMetric(in *kt.JCHF) []OtelData {
switch in.EventType {
case kt.KENTIK_EVENT_TYPE:
return f.fromKflow(in)
case kt.KENTIK_EVENT_SYNTH:
return f.fromKSynth(in)
case kt.KENTIK_EVENT_SYNTH_GEST:
Expand Down Expand Up @@ -187,7 +193,7 @@ func (f *OtelFormat) fromKSyngest(in *kt.JCHF) []OtelData {
for m, name := range metrics {
if in.CustomInt[m] > 0 {
ms = append(ms, OtelData{
Name: "kentik:syngest:" + name.Name,
Name: "kentik.syngest." + name.Name,
Value: float64(in.CustomInt[m]),
Tags: attr,
})
Expand Down Expand Up @@ -242,7 +248,7 @@ func (f *OtelFormat) fromKSynth(in *kt.JCHF) []OtelData {
switch name.Name {
case "avg_rtt", "jit_rtt", "time", "code", "port", "status", "ttlb", "size", "trx_time", "validation", "lost", "sent":
ms = append(ms, OtelData{
Name: "kentik:synth:" + name.Name,
Name: "kentik.synth." + name.Name,
Value: float64(in.CustomInt[m]),
Tags: attr,
})
Expand All @@ -252,41 +258,63 @@ func (f *OtelFormat) fromKSynth(in *kt.JCHF) []OtelData {
return ms
}

func (f *OtelFormat) fromKflow(in *kt.JCHF) []OtelData {
// Map the basic strings into here.
attr := map[string]interface{}{}
metrics := map[string]kt.MetricInfo{"in_bytes": kt.MetricInfo{}, "out_bytes": kt.MetricInfo{}, "in_pkts": kt.MetricInfo{}, "out_pkts": kt.MetricInfo{}, "latency_ms": kt.MetricInfo{}}
f.mux.RLock()
util.SetAttr(attr, in, metrics, f.lastMetadata[in.DeviceName], false)
f.mux.RUnlock()
ms := map[string]int64{}
for m, _ := range metrics {
switch m {
case "in_bytes":
ms[m] = int64(in.InBytes * uint64(in.SampleRate))
case "out_bytes":
ms[m] = int64(in.OutBytes * uint64(in.SampleRate))
case "in_pkts":
ms[m] = int64(in.InPkts * uint64(in.SampleRate))
case "out_pkts":
ms[m] = int64(in.OutPkts * uint64(in.SampleRate))
case "latency_ms":
ms[m] = int64(in.CustomInt["appl_latency_ms"])
}
}

res := []OtelData{}
for k, v := range ms {
if v == 0 { // Drop zero valued metrics here.
continue
}
res = append(res, OtelData{
Name: "kentik.flow." + k,
Value: float64(v),
Tags: attr,
})
}

return res
}

type OtelData struct {
Name string
Value float64
Tags map[string]interface{}
}

func (d *OtelData) AddTagLabels(vecTags tagVec) {
if _, ok := vecTags[d.Name]; !ok {
vecTags[d.Name] = make([]attribute.KeyValue, 0)
}
func (d *OtelData) GetTagValues() attribute.Set {
res := make([]attribute.KeyValue, 0, len(d.Tags))
for k, v := range d.Tags {
found := false
for _, kk := range vecTags[d.Name] {
if string(kk.Key) == k {
found = true
break // Key already exists.
}
}
// If here, new key.
if !found {
switch t := v.(type) {
case string:
vecTags[d.Name] = append(vecTags[d.Name], attribute.String(k, t))
case int64:
vecTags[d.Name] = append(vecTags[d.Name], attribute.Int64(k, t))
case float64:
vecTags[d.Name] = append(vecTags[d.Name], attribute.Float64(k, t))
default:
}
switch t := v.(type) {
case string:
res = append(res, attribute.String(k, t))
case int64:
res = append(res, attribute.Int64(k, t))
case float64:
res = append(res, attribute.Float64(k, t))
default:
}
}
s, _ := attribute.NewSetWithFiltered(res, func(kv attribute.KeyValue) bool { return true })
return s
}

func (d *OtelData) GetTagValues(vecTags tagVec) []attribute.KeyValue {
return vecTags[d.Name]
}

type tagVec map[string][]attribute.KeyValue

0 comments on commit 7c9b402

Please sign in to comment.