Skip to content

Commit

Permalink
Better way to send into prom (#631)
Browse files Browse the repository at this point in the history
* Fixing a race condition where some labels are miss applied
  • Loading branch information
i3149 authored Nov 7, 2023
1 parent c7af6dc commit 728f362
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 45 deletions.
16 changes: 16 additions & 0 deletions pkg/formats/prom/init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package prom

import (
"github.com/json-iterator/go"
)

// fast JSON encoding
var json = jsoniter.ConfigFastest

// jsonSorted is fast, but still sorts keys
var jsonSorted = jsoniter.Config{
IndentionStep: 4,
EscapeHTML: false,
SortMapKeys: true,
ValidateJsonRawMessage: false,
}.Froze()
174 changes: 134 additions & 40 deletions pkg/formats/prom/prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package prom
import (
"flag"
"fmt"
"strconv"
"strings"
"sync"

Expand All @@ -23,7 +24,7 @@ var (

func init() {
flag.BoolVar(&doCollectorStats, "info_collector", false, "Also send stats about this collector")
flag.IntVar(&seenNeeded, "prom_seen", 10, "Number of flows needed inbound before we start writting to the collector")
flag.IntVar(&seenNeeded, "prom_seen", 4, "Number of flows needed inbound before we start writting to the collector")

}

Expand Down Expand Up @@ -67,11 +68,11 @@ type tagVec map[string]map[string]int

type PromFormat struct {
logger.ContextL
vecs map[string]*prometheus.CounterVec
vecs map[string]*prometheus.GaugeVec
invalids map[string]bool
lastMetadata map[string]*kt.LastMetadata
vecTags tagVec
seen int
seen map[string]int
config *ktranslate.PrometheusFormatConfig

mux sync.RWMutex
Expand All @@ -83,11 +84,12 @@ func NewFormat(log logger.Underlying, compression kt.Compression, cfg *ktranslat
}
jf := &PromFormat{
ContextL: logger.NewContextLFromUnderlying(logger.SContext{S: "promFormat"}, log),
vecs: make(map[string]*prometheus.CounterVec),
vecs: make(map[string]*prometheus.GaugeVec),
invalids: map[string]bool{},
lastMetadata: map[string]*kt.LastMetadata{},
vecTags: map[string]map[string]int{},
config: cfg,
seen: map[string]int{},
}

if cfg.EnableCollectorStats {
Expand All @@ -97,12 +99,6 @@ func NewFormat(log logger.Underlying, compression kt.Compression, cfg *ktranslat
return jf, nil
}

func (f *PromFormat) addLabels(res []PromData) {
for _, m := range res {
m.AddTagLabels(f.vecTags)
}
}

func (f *PromFormat) toLabels(name string) []string {
res := make([]string, len(f.vecTags[name]))
for k, v := range f.vecTags[name] {
Expand All @@ -124,22 +120,22 @@ func (f *PromFormat) To(msgs []*kt.JCHF, serBuf []byte) (*kt.Output, error) {
f.mux.Lock()
defer f.mux.Unlock()

if f.seen < f.config.FlowsNeeded {
f.addLabels(res)
f.seen++
if f.seen == f.config.FlowsNeeded {
f.Infof("Seen enough!")
} else {
f.Infof("Seen %d", f.seen)
for _, m := range res {
if f.seen[m.Name] < f.config.FlowsNeeded {
m.AddTagLabels(f.vecTags)
f.seen[m.Name]++
if f.seen[m.Name] == f.config.FlowsNeeded {
f.Infof("Seen enough %s!", m.Name)
} else {
f.Infof("Seen %s -> %d", m.Name, f.seen[m.Name])
}
continue
}
return nil, nil
}

for _, m := range res {
if _, ok := f.vecs[m.Name]; !ok {
labels := f.toLabels(m.Name)
cv := prometheus.NewCounterVec(
prometheus.CounterOpts{
cv := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: m.Name,
},
labels,
Expand All @@ -148,6 +144,7 @@ func (f *PromFormat) To(msgs []*kt.JCHF, serBuf []byte) (*kt.Output, error) {
f.vecs[m.Name] = cv
f.Infof("Adding %s %v", m.Name, labels)
}
//f.Infof("%s, %v, %v %v", m.Name, m.Tags, f.vecTags[m.Name], f.toLabels(m.Name))
f.vecs[m.Name].WithLabelValues(m.GetTagValues(f.vecTags)...).Add(m.Value)
}

Expand All @@ -166,8 +163,8 @@ func (f *PromFormat) Rollup(rolls []rollup.Rollup) (*kt.Output, error) {
continue
}
if _, ok := f.vecs[roll.EventType]; !ok {
f.vecs[roll.EventType] = prometheus.NewCounterVec(
prometheus.CounterOpts{
f.vecs[roll.EventType] = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: strings.ReplaceAll(roll.Name, ".", ":"),
},
roll.GetDims(),
Expand All @@ -190,6 +187,8 @@ func (f *PromFormat) toPromMetric(in *kt.JCHF) []PromData {
return f.fromSnmpInterfaceMetric(in)
case kt.KENTIK_EVENT_SYNTH:
return f.fromKSynth(in)
case kt.KENTIK_EVENT_SYNTH_GEST:
return f.fromKSyngest(in)
case kt.KENTIK_EVENT_SNMP_METADATA:
return f.fromSnmpMetadata(in)
default:
Expand All @@ -204,35 +203,130 @@ func (f *PromFormat) toPromMetric(in *kt.JCHF) []PromData {
return nil
}

var (
synthWLAttr = map[string]bool{
"agent_id": true,
"agent_name": true,
"dst_addr": true,
"dst_cdn_int": true,
"dst_geo": true,
"provider": true,
"src_addr": true,
"src_cdn_int": true,
"src_as_name": true,
"src_geo": true,
"test_id": true,
"test_name": true,
"test_type": true,
"test_url": true,
"src_host": true,
"dst_host": true,
"src_cloud_region": true,
"src_cloud_provider": true,
"src_site": true,
"dst_cloud_region": true,
"dst_cloud_provider": true,
"dst_site": true,
"statusMessage": true,
"statusEncoding": true,
"https_validity": true,
"https_expiry_timestamp": true,
"dest_ip": true,
}

synthAttrKeys = []string{
"statusMessage",
"statusEncoding",
"https_validity",
"https_expiry_timestamp",
}
)

func (f *PromFormat) fromKSyngest(in *kt.JCHF) []PromData {
metrics := util.GetSyngestMetricNameSet()
attr := map[string]interface{}{}
f.mux.RLock()
util.SetAttr(attr, in, metrics, f.lastMetadata[in.DeviceName], false)
f.mux.RUnlock()
ms := make([]PromData, 0, len(metrics))

for k, v := range attr { // White list only a few attributes here.
if !synthWLAttr[k] {
delete(attr, k)
}
if k == "test_id" { // Force this to be a string.
if vi, ok := v.(int); ok {
attr[k] = strconv.Itoa(vi)
}
}
}

for m, name := range metrics {
if in.CustomInt[m] > 0 {
ms = append(ms, PromData{
Name: "kentik:syngest:" + name.Name,
Value: float64(in.CustomInt[m]),
Tags: attr,
})
}
}

return ms
}

func (f *PromFormat) fromKSynth(in *kt.JCHF) []PromData {
if in.CustomInt["result_type"] <= 1 {
return nil // Don't worry about timeouts and errors for now.
}

rawStr := in.CustomStr["error_cause/trace_route"] // Pull this out early.
metrics := util.GetSynMetricNameSet(in.CustomInt["result_type"])
attr := map[string]interface{}{}
f.mux.RLock()
util.SetAttr(attr, in, metrics, f.lastMetadata[in.DeviceName], false)
f.mux.RUnlock()
ms := map[string]int64{}
ms := make([]PromData, 0, len(metrics))

// If there's str00 data, try to unserialize and pass in useful bits.
if rawStr != "" {
strData := []interface{}{}
if err := json.Unmarshal([]byte(rawStr), &strData); err == nil {
if len(strData) > 0 {
switch sd := strData[0].(type) {
case map[string]interface{}:
for _, key := range synthAttrKeys {
if val, ok := sd[key]; ok {
attr[key] = val
}
}
}
}
}
}

for m, name := range metrics {
switch m {
case "error", "timeout":
ms[name.Name] = 1
default:
if in.CustomInt["result_type"] > 1 {
ms[name.Name] = int64(in.CustomInt[m])
for k, v := range attr { // White list only a few attributes here.
if !synthWLAttr[k] {
delete(attr, k)
}
if k == "test_id" { // Force this to be a string.
if vi, ok := v.(int); ok {
attr[k] = strconv.Itoa(vi)
}
}
}

res := []PromData{}
for k, v := range ms {
res = append(res, PromData{
Name: "kentik:synth:" + k,
Value: float64(v),
Tags: attr,
})
for m, name := range metrics {
switch name.Name {
case "avg_rtt", "jit_rtt", "time", "code", "port", "status", "ttlb", "size", "trx_time", "validation", "lost", "sent":
ms = append(ms, PromData{
Name: "kentik:synth:" + name.Name,
Value: float64(in.CustomInt[m]),
Tags: attr,
})
}
}

return res
return ms
}

func (f *PromFormat) fromKflow(in *kt.JCHF) []PromData {
Expand Down
10 changes: 5 additions & 5 deletions pkg/sinks/prom/prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ var (
)

func init() {
flag.StringVar(&listen, "prom_listen", ":8082", "Bind to listen for prometheus requests on.")
flag.StringVar(&listen, "prom_listen", "127.0.0.1:8083", "Bind to listen for prometheus requests on.")
flag.StringVar(&remoteUrl, "prom_remote_write", "", "Pass on remote write to this address.")
}

Expand Down Expand Up @@ -91,6 +91,10 @@ func (s *PromSink) Init(ctx context.Context, format formats.Format, compression
return fmt.Errorf("You must set the -prom_remote_write flag to make this work.")
}

if compression != kt.CompressionSnappy {
return fmt.Errorf("You used the %s unsupported compression format. Use snappy only.", compression)
}

s.Infof("Sending to remote_write endpoint %s", remoteUrl)
default:
return fmt.Errorf("Prometheus only supports %s and %s formats, not %s", formats.FORMAT_PROM, formats.FORMAT_PROM_REMOTE, format)
Expand All @@ -99,10 +103,6 @@ func (s *PromSink) Init(ctx context.Context, format formats.Format, compression
s.remoteUrl = remoteUrl
s.compression = compression

if s.compression != kt.CompressionSnappy {
return fmt.Errorf("You used the %s unsupported compression format. Use snappy only.", s.compression)
}

return nil
}

Expand Down

0 comments on commit 728f362

Please sign in to comment.