From e30b3f4390bb9947e2fa5c01968ef5e0b43a5339 Mon Sep 17 00:00:00 2001 From: Piotr <17101802+thampiotr@users.noreply.github.com> Date: Thu, 30 Jan 2025 10:02:12 +0000 Subject: [PATCH] tmp - good idea with grouping --- .../05_typical_pipeline_labels_wrappers.txt | 12 - .../06_typical_pipeline_grouping.txt | 12 + internal/component/discovery/discovery.go | 21 + .../component/discovery/relabel/relabel.go | 2 - internal/component/discovery/target.go | 372 ++++++++---------- .../component/discovery/target_builder.go | 31 +- internal/component/discovery/target_test.go | 4 +- 7 files changed, 208 insertions(+), 246 deletions(-) create mode 100644 internal/component/discovery/benchmark_data/06_typical_pipeline_grouping.txt diff --git a/internal/component/discovery/benchmark_data/05_typical_pipeline_labels_wrappers.txt b/internal/component/discovery/benchmark_data/05_typical_pipeline_labels_wrappers.txt index b98882f97c..e69de29bb2 100644 --- a/internal/component/discovery/benchmark_data/05_typical_pipeline_labels_wrappers.txt +++ b/internal/component/discovery/benchmark_data/05_typical_pipeline_labels_wrappers.txt @@ -1,12 +0,0 @@ -goos: darwin -goarch: arm64 -pkg: github.com/grafana/alloy/internal/component/discovery -cpu: Apple M2 -Benchmark_Targets_TypicalPipeline-8 33 34000016 ns/op 20898478 B/op 160937 allocs/op -Benchmark_Targets_TypicalPipeline-8 32 36584542 ns/op 20826227 B/op 160935 allocs/op -Benchmark_Targets_TypicalPipeline-8 30 35797154 ns/op 20827766 B/op 160935 allocs/op -Benchmark_Targets_TypicalPipeline-8 33 34531398 ns/op 20899688 B/op 160941 allocs/op -Benchmark_Targets_TypicalPipeline-8 32 35422260 ns/op 20827490 B/op 160933 allocs/op -Benchmark_Targets_TypicalPipeline-8 33 34420867 ns/op 20897838 B/op 160939 allocs/op -PASS -ok github.com/grafana/alloy/internal/component/discovery 8.385s diff --git a/internal/component/discovery/benchmark_data/06_typical_pipeline_grouping.txt b/internal/component/discovery/benchmark_data/06_typical_pipeline_grouping.txt new file mode 100644 index 0000000000..ee431c0947 --- /dev/null +++ b/internal/component/discovery/benchmark_data/06_typical_pipeline_grouping.txt @@ -0,0 +1,12 @@ +goos: darwin +goarch: arm64 +pkg: github.com/grafana/alloy/internal/component/discovery +cpu: Apple M2 +Benchmark_Targets_TypicalPipeline-8 36 32660016 ns/op 10841145 B/op 80541 allocs/op +Benchmark_Targets_TypicalPipeline-8 36 32693987 ns/op 10927902 B/op 80541 allocs/op +Benchmark_Targets_TypicalPipeline-8 37 35438566 ns/op 10927976 B/op 80543 allocs/op +Benchmark_Targets_TypicalPipeline-8 34 32530555 ns/op 10927663 B/op 80540 allocs/op +Benchmark_Targets_TypicalPipeline-8 30 38546656 ns/op 10840045 B/op 80538 allocs/op +Benchmark_Targets_TypicalPipeline-8 34 32129576 ns/op 10927397 B/op 80537 allocs/op +PASS +ok github.com/grafana/alloy/internal/component/discovery 9.699s diff --git a/internal/component/discovery/discovery.go b/internal/component/discovery/discovery.go index dd05704778..40cfe406b2 100644 --- a/internal/component/discovery/discovery.go +++ b/internal/component/discovery/discovery.go @@ -220,6 +220,7 @@ func (c *Component) runDiscovery(ctx context.Context, d DiscovererWithMetrics) { } func toAlloyTargets(cache map[string]*targetgroup.Group) []Target { + // logDebugInfo(cache) targetsCount := 0 for _, group := range cache { targetsCount += len(group.Targets) @@ -235,4 +236,24 @@ func toAlloyTargets(cache map[string]*targetgroup.Group) []Target { return allTargets } +// +// var lastLogTime = time.Now() +// func logDebugInfo(c map[string]*targetgroup.Group) { +// if time.Since(lastLogTime) < time.Second*10 { +// return +// } +// numGroups := len(c) +// fmt.Printf("DEBUG ============================================\n") +// fmt.Printf("numGroups: %v\n", numGroups) +// for name, group := range c { +// fmt.Printf("group: %q, common labels: %v, targets: %v\n", name, len(group.Labels), len(group.Targets)) +// avgTargetLabels := 0 +// for _, target := range group.Targets { +// avgTargetLabels += len(target) +// } +// fmt.Printf("avgTargetLabels: %v\n", float32(avgTargetLabels)/float32(len(group.Targets))) +// } +// lastLogTime = time.Now() +// } + func (c *Component) LiveDebugging(_ int) {} diff --git a/internal/component/discovery/relabel/relabel.go b/internal/component/discovery/relabel/relabel.go index aa259f2d03..ae2302c405 100644 --- a/internal/component/discovery/relabel/relabel.go +++ b/internal/component/discovery/relabel/relabel.go @@ -83,8 +83,6 @@ func (c *Component) Update(args component.Arguments) error { targets := make([]discovery.Target, 0, len(newArgs.Targets)) - // TODO(thampiotr): can do even better perhaps by relabeling all the group ones separately from all the - // individual ones for _, t := range newArgs.Targets { var ( relabelled discovery.Target diff --git a/internal/component/discovery/target.go b/internal/component/discovery/target.go index 9866efeef4..8dba1f5bab 100644 --- a/internal/component/discovery/target.go +++ b/internal/component/discovery/target.go @@ -1,6 +1,5 @@ package discovery -import "C" import ( "fmt" "slices" @@ -8,60 +7,97 @@ import ( "github.com/cespare/xxhash/v2" commonlabels "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/discovery/targetgroup" modellabels "github.com/prometheus/prometheus/model/labels" "github.com/grafana/alloy/syntax" ) -/* -NEW IDEA: -- Target is an interface -- Make it immutable -- there are two underlying implementations: two LabelSet based and one Labels based -- we can strategically convert between these formats using constructors etc: - - e.g. use discovery's format of LabelSet - - change to Labels only for relabelling, or if no relabelling, only targets that we scrape after - clustering calculations? - - have a more efficient hashing for distributed_targets using LabelSet? -*/ - type Target struct { // NOTE: it is essential that equality between targets continues to work as it is used by Alloy runtime to // decide whether updates need to be propagated throughout the pipeline. See tests. - // TODO(thampiotr): target comparison with different implementations - labels commonLabels + + // TODO(thampiotr): equality may still be a problem in theory here if we regroup these + + group commonlabels.LabelSet + own commonlabels.LabelSet + size int } var ( + seps = []byte{'\xff'} + _ syntax.Capsule = Target{} _ syntax.ConvertibleIntoCapsule = Target{} _ syntax.ConvertibleFromCapsule = &Target{} ) +func ComponentTargetsToPromTargetGroups(jobName string, tgs []Target) map[string][]*targetgroup.Group { + targetsWithCommonGroupLabels := map[uint64][]Target{} + for _, t := range tgs { + fp := t.GroupLabelsHash() // TODO(thampiotr): if worried about collisions we can use string? + targetsWithCommonGroupLabels[fp] = append(targetsWithCommonGroupLabels[fp], t) + } + + allGroups := make(map[string][]*targetgroup.Group, len(targetsWithCommonGroupLabels)) + + groupIndex := 0 + for _, targetsInGroup := range targetsWithCommonGroupLabels { + groupName := fmt.Sprintf("%s_%d", jobName, groupIndex) + groupTargets := make([]commonlabels.LabelSet, len(targetsInGroup)) + for i, target := range targetsInGroup { + groupTargets[i] = target.own + } + promGroup := &targetgroup.Group{ + Source: groupName, + Labels: targetsInGroup[0].group, + } + allGroups[groupName] = append(allGroups[groupName], promGroup) + } + return allGroups +} + func NewEmptyTarget() Target { return NewTargetFromLabelSet(make(commonlabels.LabelSet)) } func NewTargetFromLabelSet(targetLabels commonlabels.LabelSet) Target { return Target{ - labels: newDiscoveryLabelSet(nil, targetLabels), + own: targetLabels, } } -// TODO(thampiotr): 27% allocs // TODO(thampiotr): discovery.* -func NewTargetFromSpecificAndBaseLabelSet(specific, group commonlabels.LabelSet) Target { - return Target{ - labels: newDiscoveryLabelSet(group, specific), +func NewTargetFromSpecificAndBaseLabelSet(own, group commonlabels.LabelSet) Target { + if group == nil { + group = commonlabels.LabelSet{} + } + if own == nil { + own = commonlabels.LabelSet{} } + ret := Target{ + group: group, + own: own, + } + size := 0 + ret.ForEachLabel(func(key string, value string) bool { + size++ + return true + }) + ret.size = size + return ret } +// NewTargetFromModelLabels creates a target from model Labels. +// NOTE: this is not optimised and should be avoided on a hot path. // TODO(thampiotr): 27% allocs // TODO(thampiotr): discovery.relabel func NewTargetFromModelLabels(labels modellabels.Labels) Target { - return Target{ - labels: newModelLabels(labels), + l := make(commonlabels.LabelSet, len(labels)) + for _, label := range labels { + l[commonlabels.LabelName(label.Name)] = commonlabels.LabelValue(label.Value) } + return NewTargetFromLabelSet(l) } // TODO(thampiotr): this uses LabelSet, but maybe should use model.Labels? @@ -75,8 +111,19 @@ func NewTargetFromMap(m map[string]string) Target { // TODO(thampiotr): 13% allocs // TODO(thampiotr): discovery.relabel +// TODO(thampiotr): get rid of this one!! func (t Target) Labels() modellabels.Labels { - return t.labels.Labels() + // This method allocates less than Builder or ScratchBuilder, as proven by benchmarks. + lb := make([]modellabels.Label, 0, t.Len()) + t.ForEachLabel(func(key string, value string) bool { + lb = append(lb, modellabels.Label{ + Name: key, + Value: value, + }) + return true + }) + slices.SortFunc(lb, func(a, b modellabels.Label) int { return strings.Compare(a.Name, b.Name) }) + return lb } // TODO(thampiotr): 13% allocs @@ -127,7 +174,25 @@ func (t Target) LabelsWithPredicate(pred func(key string) bool) modellabels.Labe // false, the iteration is interrupted. If f returns true, the iteration continues until the last element. ForEachLabel // returns true if all the labels were iterated over or false if any call to f has interrupted the iteration. func (t Target) ForEachLabel(f func(key string, value string) bool) bool { - return t.labels.ForEachLabel(f) + for k, v := range t.own { + if !f(string(k), string(v)) { + // f has returned false, interrupt the iteration and return false. + return false + } + } + // Now go over the group ones only if they were not part of own labels + for k, v := range t.group { + if _, ok := t.own[k]; ok { + continue + } + if !f(string(k), string(v)) { + // f has returned false, interrupt the iteration and return false. + return false + } + } + + // We finished the iteration, return true. + return true } // AsMap returns target's labels as a map of strings. @@ -142,15 +207,29 @@ func (t Target) AsMap() map[string]string { } func (t Target) Get(key string) (string, bool) { - return t.labels.Get(key) + lv, ok := t.own[commonlabels.LabelName(key)] + if ok { + return string(lv), ok + } + lv, ok = t.group[commonlabels.LabelName(key)] + return string(lv), ok } +// LabelSet converts this target in to a LabelSet +// NOTE: this is not optimised and should be avoided if possible. func (t Target) LabelSet() commonlabels.LabelSet { - return t.labels.LabelSet() + merged := make(commonlabels.LabelSet, t.Len()) + for k, v := range t.group { + merged[k] = v + } + for k, v := range t.own { + merged[k] = v + } + return merged } func (t Target) Len() int { - return t.labels.Len() + return t.size } // AlloyCapsule marks FastTarget as a capsule so Alloy syntax can marshal to or from it. @@ -212,215 +291,72 @@ func (t Target) Equal(other Target) bool { } func (t Target) NonMetaLabelsHash() uint64 { - return t.labels.HashLabelsWithPredicate(func(key string) bool { + return t.HashLabelsWithPredicate(func(key string) bool { return !strings.HasPrefix(key, commonlabels.MetaLabelPrefix) }) } func (t Target) SpecificLabelsHash(labelNames []string) uint64 { - return t.labels.HashLabelsWithPredicate(func(key string) bool { + return t.HashLabelsWithPredicate(func(key string) bool { return slices.Contains(labelNames, key) }) } -type commonLabels interface { - // LabelSet returns the labels as prometheus/common/model/LabelSet. - // NOTE: this may be inefficient depending on underlying implementation. - LabelSet() commonlabels.LabelSet - // Labels returns the labels as prometheus/prometheus/model/labels/Labels - // NOTE: this may be inefficient depending on underlying implementation. - Labels() modellabels.Labels - Get(key string) (string, bool) - ForEachLabel(f func(key string, value string) bool) bool - HashLabelsWithPredicate(pred func(key string) bool) uint64 - Len() int -} - -type modelLabels struct { - labels modellabels.Labels -} - -func newModelLabels(labels modellabels.Labels) modelLabels { - return modelLabels{ - labels: labels, - } -} - -func (m modelLabels) LabelSet() commonlabels.LabelSet { - result := make(commonlabels.LabelSet, m.Len()) - m.ForEachLabel(func(key string, value string) bool { - result[commonlabels.LabelName(key)] = commonlabels.LabelValue(value) - return true - }) - return result -} - -func (m modelLabels) Get(key string) (string, bool) { - val := m.labels.Get(key) - if val == "" { - return "", m.labels.Has(key) - } - return val, true -} - -func (m modelLabels) ForEachLabel(f func(key string, value string) bool) bool { - for _, l := range m.labels { - if !f(l.Name, l.Value) { - return false - } - } - return true -} - -var seps = []byte{'\xff'} - -func (m modelLabels) HashLabelsWithPredicate(pred func(key string) bool) uint64 { - // Optimisation to use xxhash.Sum64(b) is adapted from Prometheus. - b := make([]byte, 0, 1024) - for i, v := range m.labels { - if !pred(v.Name) { - continue // Skip label if not fulfilling the predicate. - } - if len(b)+len(v.Name)+len(v.Value)+2 >= cap(b) { - // If labels entry is 1KB+ do not allocate whole entry. - h := xxhash.New() - _, _ = h.Write(b) - for _, v := range m.labels[i:] { - _, _ = h.WriteString(v.Name) - _, _ = h.Write(seps) - _, _ = h.WriteString(v.Value) - _, _ = h.Write(seps) +func (t Target) HashLabelsWithPredicate(pred func(key string) bool) uint64 { + { + // For hash to be deterministic, we need labels order to be deterministic too. Figure this out first. + labelsInOrder := make([]string, 0, t.Len()) // TODO(thampiotr): this can go to object pool? + t.ForEachLabel(func(key string, value string) bool { + if pred(value) { + labelsInOrder = append(labelsInOrder, key) } - return h.Sum64() - } - - b = append(b, v.Name...) - b = append(b, seps[0]) - b = append(b, v.Value...) - b = append(b, seps[0]) - } - return xxhash.Sum64(b) -} - -func (m modelLabels) Labels() modellabels.Labels { - return m.labels -} - -func (m modelLabels) Len() int { - return len(m.labels) -} - -type discoveryLabelSet struct { - group commonlabels.LabelSet - own commonlabels.LabelSet - size int -} - -func newDiscoveryLabelSet(group commonlabels.LabelSet, own commonlabels.LabelSet) discoveryLabelSet { - if group == nil { - group = commonlabels.LabelSet{} - } - if own == nil { - own = commonlabels.LabelSet{} - } - ret := discoveryLabelSet{ - group: group, - own: own, - } - size := 0 - ret.ForEachLabel(func(key string, value string) bool { - size++ - return true - }) - ret.size = size - return ret -} - -func (c discoveryLabelSet) Len() int { - return c.size -} - -func (c discoveryLabelSet) LabelSet() commonlabels.LabelSet { - merged := make(commonlabels.LabelSet, c.Len()) - for k, v := range c.group { - merged[k] = v - } - for k, v := range c.own { - merged[k] = v - } - return merged -} - -func (c discoveryLabelSet) Get(key string) (string, bool) { - lv, ok := c.own[commonlabels.LabelName(key)] - if ok { - return string(lv), ok - } - lv, ok = c.group[commonlabels.LabelName(key)] - return string(lv), ok -} + return true + }) + slices.Sort(labelsInOrder) -func (c discoveryLabelSet) ForEachLabel(f func(key string, value string) bool) bool { - for k, v := range c.own { - if !f(string(k), string(v)) { - // f has returned false, interrupt the iteration and return false. - return false + mustGet := func(label string) string { + val, ok := t.Get(label) + if !ok { + panic("label concurrently modified - this is a bug - please report an issue") + } + return val } - } - // Now go over the group ones only if they were not part of own labels - for k, v := range c.group { - if _, ok := c.own[k]; ok { - continue - } - if !f(string(k), string(v)) { - // f has returned false, interrupt the iteration and return false. - return false + // TODO(thampiotr): Could do the same optimisation as in modelLabels to use 1KB buffer. + h := xxhash.New() + for _, label := range labelsInOrder { + _, _ = h.WriteString(label) + _, _ = h.Write(seps) + _, _ = h.WriteString(mustGet(label)) + _, _ = h.Write(seps) } + return h.Sum64() } - - // We finished the iteration, return true. - return true } -func (c discoveryLabelSet) HashLabelsWithPredicate(pred func(key string) bool) uint64 { - // For hash to be deterministic, we need labels order to be deterministic too. Figure this out first. - labelsInOrder := make([]string, 0, c.Len()) // TODO(thampiotr): this can go to object pool? - c.ForEachLabel(func(key string, value string) bool { - if pred(value) { - labelsInOrder = append(labelsInOrder, key) +func (t Target) GroupLabelsHash() uint64 { + { + // For hash to be deterministic, we need labels order to be deterministic too. Figure this out first. + labelsInOrder := make([]string, 0, len(t.group)) // TODO(thampiotr): this can go to object pool? + for name := range t.group { + labelsInOrder = append(labelsInOrder, string(name)) } - return true - }) - slices.Sort(labelsInOrder) + slices.Sort(labelsInOrder) - mustGet := func(label string) string { - val, ok := c.Get(label) - if !ok { - panic("label concurrently modified - this is a bug - please report an issue") + mustGet := func(label string) string { + val, ok := t.Get(label) + if !ok { + panic("label concurrently modified - this is a bug - please report an issue") + } + return val } - return val - } - // TODO(thampiotr): Could do the same optimisation as in modelLabels to use 1KB buffer. - h := xxhash.New() - for _, label := range labelsInOrder { - _, _ = h.WriteString(label) - _, _ = h.Write(seps) - _, _ = h.WriteString(mustGet(label)) - _, _ = h.Write(seps) + // TODO(thampiotr): Could do the same optimisation as in modelLabels to use 1KB buffer. + h := xxhash.New() + for _, label := range labelsInOrder { + _, _ = h.WriteString(label) + _, _ = h.Write(seps) + _, _ = h.WriteString(mustGet(label)) + _, _ = h.Write(seps) + } + return h.Sum64() } - return h.Sum64() -} - -func (c discoveryLabelSet) Labels() modellabels.Labels { - // This method allocates less than Builder or ScratchBuilder, as proven by benchmarks. - lb := make([]modellabels.Label, 0, c.Len()) - c.ForEachLabel(func(key string, value string) bool { - lb = append(lb, modellabels.Label{ - Name: key, - Value: value, - }) - return true - }) - slices.SortFunc(lb, func(a, b modellabels.Label) int { return strings.Compare(a.Name, b.Name) }) - return lb } diff --git a/internal/component/discovery/target_builder.go b/internal/component/discovery/target_builder.go index eab0f55bf6..115c0c0906 100644 --- a/internal/component/discovery/target_builder.go +++ b/internal/component/discovery/target_builder.go @@ -44,6 +44,9 @@ func (m *mapTargetBuilder) Target() Target { return NewTargetFromMap(m.t) } +const initialLabelsBuilderCapacity = 5 + +// TODO(thampiotr): maybe all targets can have toDel map ready? type labelsBuilder struct { group commonlabels.LabelSet own commonlabels.LabelSet @@ -53,24 +56,25 @@ type labelsBuilder struct { } func NewLabelsBuilderFromTarget(t Target) relabel.LabelBuilder { - if lst, ok := t.labels.(discoveryLabelSet); ok { - return labelsBuilder{ - group: lst.group, - own: lst.own, - toAdd: make(map[string]string, 5), - toDel: make(map[string]struct{}, 5), - } + return &labelsBuilder{ + group: t.group, + own: t.own, + // TODO(thampiotr): we could postulate that builder is throw-away after .ToTarget() and use object pool for these. + toAdd: make(map[string]string, initialLabelsBuilderCapacity), + toDel: make(map[string]struct{}, initialLabelsBuilderCapacity), } - return &labelsBuilder{} } func NewTargetFromLabelsBuilder(lb relabel.LabelBuilder) Target { // Use optimised path if possible - if ilb, ok := lb.(labelsBuilder); ok { - return ilb.ToTarget() + if ilb, ok := lb.(*labelsBuilder); ok { + return ilb.ToTarget() // TODO(thampiotr): it could be part of interface and we'd avoid this type checking } - // Otherwise, non-optimised merge. + // TODO(thampiotr): remove this, just checking for now + panic("not right - need to optimise this!") + + // Otherwise, non-optimised merge for other implementations. res := make(commonlabels.LabelSet) lb.Range(func(k, v string) { res[commonlabels.LabelName(k)] = commonlabels.LabelValue(v) @@ -156,6 +160,8 @@ func (l labelsBuilder) ToTarget() Target { ) if modifyOwn { + // TODO(thampiotr): further benchmarking is needed here, but if this is causing a lot of allocation, we could + // try implementing Target with toAdd and toDel overlays instead. newOwn = make(commonlabels.LabelSet, len(l.own)+len(l.toAdd)) for k, v := range l.own { if _, ok := l.toDel[string(k)]; ok { @@ -168,6 +174,9 @@ func (l labelsBuilder) ToTarget() Target { } } if modifyGroup { + // TODO(thampiotr): When relabeling a lot of targets we possibly will produce a lot of l.groups that will + // all be the same. So maybe we need a step to consolidate them? Or rewrite the relabeling + // logic to work on collection of targets with groups. newGroup = make(commonlabels.LabelSet, len(l.group)) for k, v := range l.group { if _, ok := l.toDel[string(k)]; ok { diff --git a/internal/component/discovery/target_test.go b/internal/component/discovery/target_test.go index 9d8bb81c7b..76be3dc8ed 100644 --- a/internal/component/discovery/target_test.go +++ b/internal/component/discovery/target_test.go @@ -159,9 +159,7 @@ func Benchmark_Targets_TypicalPipeline(b *testing.B) { _ = dt.LocalTargets() _ = dt.MovedToRemoteInstance(prevDistTargets) // Sending LabelSet to Prometheus library for scraping - for _, target := range targets { - _ = target.LabelSet() - } + _ = ComponentTargetsToPromTargetGroups("test", targets) // Remote write happens on a sample level and largely outside Alloy's codebase, so skipping here.