From dddfa4e7f9ae00b57933fc45c5c343da4489b15d Mon Sep 17 00:00:00 2001 From: Piotr <17101802+thampiotr@users.noreply.github.com> Date: Wed, 22 Jan 2025 15:07:06 +0000 Subject: [PATCH] wip - squashed --- internal/component/beyla/ebpf/beyla_linux.go | 6 +- internal/component/common/relabel/process.go | 112 ++++++ .../database_observability/mysql/component.go | 6 +- .../01_typical_pipeline_targets_as_maps.txt | 16 + ...02_typical_pipeline_targets_as_capsule.txt | 16 + .../03_typical_pipeline_optimised.txt | 12 + .../04_typical_pipeline_cached_labels.txt | 12 + .../05_typical_pipeline_labels_wrappers.txt | 0 .../06_typical_pipeline_grouping.txt | 12 + .../07_typical_pipeline_cleanup.txt | 12 + internal/component/discovery/discovery.go | 53 +-- .../component/discovery/discovery_test.go | 65 ++-- .../discovery/distributed_targets.go | 4 +- .../discovery/distributed_targets_test.go | 4 +- .../component/discovery/process/container.go | 6 +- .../component/discovery/process/discover.go | 7 +- internal/component/discovery/process/join.go | 15 +- .../component/discovery/process/join_test.go | 39 ++- .../component/discovery/relabel/relabel.go | 38 +- .../discovery/relabel/relabel_test.go | 5 +- internal/component/discovery/target.go | 331 ++++++++++++++++++ .../component/discovery/target_builder.go | 171 +++++++++ internal/component/discovery/target_test.go | 207 +++++++++++ .../component/local/file_match/file_test.go | 21 +- internal/component/local/file_match/watch.go | 15 +- .../component/loki/process/process_test.go | 2 +- .../component/loki/relabel/relabel_test.go | 2 +- .../component/loki/source/docker/docker.go | 14 +- internal/component/loki/source/file/file.go | 16 +- .../component/loki/source/file/file_test.go | 29 +- .../loki/source/kubernetes/kubernetes_test.go | 14 +- internal/component/loki/write/write_test.go | 2 +- .../prometheus/exporter/blackbox/blackbox.go | 11 +- .../exporter/blackbox/blackbox_test.go | 42 ++- .../component/prometheus/exporter/exporter.go | 12 +- .../prometheus/exporter/kafka/kafka.go | 11 +- .../prometheus/exporter/kafka/kafka_test.go | 7 +- .../prometheus/exporter/snmp/snmp.go | 17 +- .../prometheus/exporter/snmp/snmp_test.go | 21 +- .../component/prometheus/scrape/scrape.go | 47 ++- .../scrape/scrape_clustering_test.go | 4 +- .../component/pyroscope/ebpf/ebpf_linux.go | 11 +- .../pyroscope/ebpf/ebpf_linux_test.go | 8 +- internal/component/pyroscope/java/java.go | 9 +- internal/component/pyroscope/java/loop.go | 11 +- internal/component/pyroscope/java/target.go | 17 +- internal/component/pyroscope/scrape/scrape.go | 20 +- .../pyroscope/scrape/scrape_loop_test.go | 9 +- .../component/pyroscope/scrape/scrape_test.go | 43 +-- .../internal/common/convert_targets.go | 9 +- .../internal/common/convert_targets_test.go | 27 +- .../prometheusconvert/component/scrape.go | 2 +- .../internal/build/service_discovery.go | 2 +- .../converter/internal/test_common/testing.go | 7 +- .../runtime/componenttest/componenttest.go | 12 +- internal/runtime/equality/deep_equal.go | 2 + internal/runtime/equality/equality.go | 123 +++++++ .../controller/node_builtin_component.go | 5 +- .../controller/node_custom_component.go | 4 +- .../internal/controller/node_service.go | 3 +- .../internal/controller/value_cache.go | 4 +- .../internal/importsource/import_file.go | 5 +- .../internal/importsource/import_git.go | 6 +- .../internal/importsource/import_http.go | 4 +- .../internal/importsource/import_string.go | 4 +- .../internal/testcomponents/module/git/git.go | 5 +- .../internal/testcomponents/module/module.go | 4 +- internal/service/ui/ui.go | 1 + .../promsdprocessor/consumer/consumer.go | 34 +- .../promsdprocessor/consumer/consumer_test.go | 9 +- .../promsdprocessor/prom_sd_processor.go | 6 +- .../promsdprocessor/prom_sd_processor_test.go | 15 +- internal/util/testtarget/test_target.go | 4 +- syntax/encoding/alloyjson/alloyjson.go | 52 +-- syntax/encoding/alloyjson/alloyjson_test.go | 43 ++- syntax/internal/value/value.go | 29 +- syntax/token/builder/builder_test.go | 60 +++- syntax/token/builder/value_tokens.go | 66 ++-- syntax/types.go | 6 + 79 files changed, 1617 insertions(+), 500 deletions(-) create mode 100644 internal/component/common/relabel/process.go create mode 100644 internal/component/discovery/benchmark_data/01_typical_pipeline_targets_as_maps.txt create mode 100644 internal/component/discovery/benchmark_data/02_typical_pipeline_targets_as_capsule.txt create mode 100644 internal/component/discovery/benchmark_data/03_typical_pipeline_optimised.txt create mode 100644 internal/component/discovery/benchmark_data/04_typical_pipeline_cached_labels.txt create mode 100644 internal/component/discovery/benchmark_data/05_typical_pipeline_labels_wrappers.txt create mode 100644 internal/component/discovery/benchmark_data/06_typical_pipeline_grouping.txt create mode 100644 internal/component/discovery/benchmark_data/07_typical_pipeline_cleanup.txt create mode 100644 internal/component/discovery/target.go create mode 100644 internal/component/discovery/target_builder.go create mode 100644 internal/component/discovery/target_test.go create mode 100644 internal/runtime/equality/deep_equal.go create mode 100644 internal/runtime/equality/equality.go diff --git a/internal/component/beyla/ebpf/beyla_linux.go b/internal/component/beyla/ebpf/beyla_linux.go index d47845d2c1..a79ab82077 100644 --- a/internal/component/beyla/ebpf/beyla_linux.go +++ b/internal/component/beyla/ebpf/beyla_linux.go @@ -273,17 +273,17 @@ func (c *Component) Update(args component.Arguments) error { func (c *Component) baseTarget() (discovery.Target, error) { data, err := c.opts.GetServiceData(http_service.ServiceName) if err != nil { - return nil, fmt.Errorf("failed to get HTTP information: %w", err) + return discovery.EmptyTarget, fmt.Errorf("failed to get HTTP information: %w", err) } httpData := data.(http_service.Data) - return discovery.Target{ + return discovery.NewTargetFromMap(map[string]string{ model.AddressLabel: httpData.MemoryListenAddr, model.SchemeLabel: "http", model.MetricsPathLabel: path.Join(httpData.HTTPPathForComponent(c.opts.ID), "metrics"), "instance": defaultInstance(), "job": "beyla", - }, nil + }), nil } func (c *Component) reportUnhealthy(err error) { diff --git a/internal/component/common/relabel/process.go b/internal/component/common/relabel/process.go new file mode 100644 index 0000000000..5a99589d06 --- /dev/null +++ b/internal/component/common/relabel/process.go @@ -0,0 +1,112 @@ +package relabel + +// TODO(thampiotr): write comment +// TODO(thampiotr): port the test + +import ( + "crypto/md5" + "encoding/binary" + "fmt" + "strings" + + "github.com/prometheus/common/model" +) + +type LabelBuilder interface { + Get(label string) string + // TODO(thampiotr): test that Set and Del can be called while iterating. + Range(f func(label string, value string)) + Set(label string, val string) + Del(ns ...string) +} + +// ProcessBuilder is like Process, but the caller passes a labels.Builder +// containing the initial set of labels, which is mutated by the rules. +func ProcessBuilder(lb LabelBuilder, cfgs ...*Config) (keep bool) { + for _, cfg := range cfgs { + keep = doRelabel(cfg, lb) + if !keep { + return false + } + } + return true +} + +func doRelabel(cfg *Config, lb LabelBuilder) (keep bool) { + var va [16]string + values := va[:0] + if len(cfg.SourceLabels) > cap(values) { + values = make([]string, 0, len(cfg.SourceLabels)) + } + for _, ln := range cfg.SourceLabels { + values = append(values, lb.Get(string(ln))) + } + val := strings.Join(values, cfg.Separator) + + switch cfg.Action { + case Drop: + if cfg.Regex.MatchString(val) { + return false + } + case Keep: + if !cfg.Regex.MatchString(val) { + return false + } + case DropEqual: + if lb.Get(cfg.TargetLabel) == val { + return false + } + case KeepEqual: + if lb.Get(cfg.TargetLabel) != val { + return false + } + case Replace: + indexes := cfg.Regex.FindStringSubmatchIndex(val) + // If there is no match no replacement must take place. + if indexes == nil { + break + } + target := model.LabelName(cfg.Regex.ExpandString([]byte{}, cfg.TargetLabel, val, indexes)) + if !target.IsValid() { + break + } + res := cfg.Regex.ExpandString([]byte{}, cfg.Replacement, val, indexes) + if len(res) == 0 { + lb.Del(string(target)) + break + } + lb.Set(string(target), string(res)) + case Lowercase: + lb.Set(cfg.TargetLabel, strings.ToLower(val)) + case Uppercase: + lb.Set(cfg.TargetLabel, strings.ToUpper(val)) + case HashMod: + hash := md5.Sum([]byte(val)) + // Use only the last 8 bytes of the hash to give the same result as earlier versions of this code. + mod := binary.BigEndian.Uint64(hash[8:]) % cfg.Modulus + lb.Set(cfg.TargetLabel, fmt.Sprintf("%d", mod)) + case LabelMap: + lb.Range(func(name, value string) { + if cfg.Regex.MatchString(name) { + res := cfg.Regex.ReplaceAllString(name, cfg.Replacement) + lb.Set(res, value) + } + }) + case LabelDrop: + lb.Range(func(name, value string) { + if cfg.Regex.MatchString(name) { + lb.Del(name) + } + }) + case LabelKeep: + lb.Range(func(name, value string) { + if !cfg.Regex.MatchString(name) { + lb.Del(value) + } + }) + default: + panic(fmt.Errorf("relabel: unknown relabel action type %q", cfg.Action)) + } + + return true +} diff --git a/internal/component/database_observability/mysql/component.go b/internal/component/database_observability/mysql/component.go index 72fcd47560..aeba717244 100644 --- a/internal/component/database_observability/mysql/component.go +++ b/internal/component/database_observability/mysql/component.go @@ -165,17 +165,17 @@ func (c *Component) Run(ctx context.Context) error { func (c *Component) getBaseTarget() (discovery.Target, error) { data, err := c.opts.GetServiceData(http_service.ServiceName) if err != nil { - return nil, fmt.Errorf("failed to get HTTP information: %w", err) + return discovery.EmptyTarget, fmt.Errorf("failed to get HTTP information: %w", err) } httpData := data.(http_service.Data) - return discovery.Target{ + return discovery.NewTargetFromMap(map[string]string{ model.AddressLabel: httpData.MemoryListenAddr, model.SchemeLabel: "http", model.MetricsPathLabel: path.Join(httpData.HTTPPathForComponent(c.opts.ID), "metrics"), "instance": c.instanceKey, "job": database_observability.JobName, - }, nil + }), nil } func (c *Component) Update(args component.Arguments) error { diff --git a/internal/component/discovery/benchmark_data/01_typical_pipeline_targets_as_maps.txt b/internal/component/discovery/benchmark_data/01_typical_pipeline_targets_as_maps.txt new file mode 100644 index 0000000000..8ca84b844d --- /dev/null +++ b/internal/component/discovery/benchmark_data/01_typical_pipeline_targets_as_maps.txt @@ -0,0 +1,16 @@ +goos: darwin +goarch: arm64 +pkg: github.com/grafana/alloy/internal/component/discovery +cpu: Apple M2 +Benchmark_Targets_TypicalPipeline-8 30 38546436 ns/op 64963036 B/op 241805 allocs/op +Benchmark_Targets_TypicalPipeline-8 28 38647682 ns/op 64953672 B/op 241803 allocs/op +Benchmark_Targets_TypicalPipeline-8 28 38550351 ns/op 64967889 B/op 241822 allocs/op +Benchmark_Targets_TypicalPipeline-8 30 38803608 ns/op 64960262 B/op 241807 allocs/op +Benchmark_Targets_TypicalPipeline-8 27 38605718 ns/op 64965312 B/op 241814 allocs/op +Benchmark_Targets_TypicalPipeline-8 28 38427967 ns/op 64958167 B/op 241812 allocs/op +Benchmark_Targets_TypicalPipeline-8 28 38815302 ns/op 64961392 B/op 241810 allocs/op +Benchmark_Targets_TypicalPipeline-8 28 39033807 ns/op 64964683 B/op 241812 allocs/op +Benchmark_Targets_TypicalPipeline-8 26 39444696 ns/op 64963500 B/op 241818 allocs/op +Benchmark_Targets_TypicalPipeline-8 26 38543861 ns/op 64961057 B/op 241808 allocs/op +PASS +ok github.com/grafana/alloy/internal/component/discovery 23.807s diff --git a/internal/component/discovery/benchmark_data/02_typical_pipeline_targets_as_capsule.txt b/internal/component/discovery/benchmark_data/02_typical_pipeline_targets_as_capsule.txt new file mode 100644 index 0000000000..31595da28f --- /dev/null +++ b/internal/component/discovery/benchmark_data/02_typical_pipeline_targets_as_capsule.txt @@ -0,0 +1,16 @@ +goos: darwin +goarch: arm64 +pkg: github.com/grafana/alloy/internal/component/discovery +cpu: Apple M2 +Benchmark_Targets_TypicalPipeline-8 24 45995441 ns/op 66573585 B/op 241368 allocs/op +Benchmark_Targets_TypicalPipeline-8 25 44810203 ns/op 66532546 B/op 241380 allocs/op +Benchmark_Targets_TypicalPipeline-8 24 44766747 ns/op 66531254 B/op 241373 allocs/op +Benchmark_Targets_TypicalPipeline-8 25 45150108 ns/op 66530506 B/op 241367 allocs/op +Benchmark_Targets_TypicalPipeline-8 25 47782998 ns/op 66576992 B/op 241376 allocs/op +Benchmark_Targets_TypicalPipeline-8 25 47254332 ns/op 66576284 B/op 241378 allocs/op +Benchmark_Targets_TypicalPipeline-8 24 44567898 ns/op 66536935 B/op 241394 allocs/op +Benchmark_Targets_TypicalPipeline-8 24 44572594 ns/op 66574589 B/op 241369 allocs/op +Benchmark_Targets_TypicalPipeline-8 25 44610702 ns/op 66532897 B/op 241375 allocs/op +Benchmark_Targets_TypicalPipeline-8 25 44945075 ns/op 66575368 B/op 241376 allocs/op +PASS +ok github.com/grafana/alloy/internal/component/discovery 23.817s diff --git a/internal/component/discovery/benchmark_data/03_typical_pipeline_optimised.txt b/internal/component/discovery/benchmark_data/03_typical_pipeline_optimised.txt new file mode 100644 index 0000000000..02d9ecf4cd --- /dev/null +++ b/internal/component/discovery/benchmark_data/03_typical_pipeline_optimised.txt @@ -0,0 +1,12 @@ +goos: darwin +goarch: arm64 +pkg: github.com/grafana/alloy/internal/component/discovery +cpu: Apple M2 +Benchmark_Targets_TypicalPipeline-8 72 15278984 ns/op 20085046 B/op 60435 allocs/op +Benchmark_Targets_TypicalPipeline-8 75 14940728 ns/op 20085236 B/op 60436 allocs/op +Benchmark_Targets_TypicalPipeline-8 75 15123676 ns/op 20085305 B/op 60437 allocs/op +Benchmark_Targets_TypicalPipeline-8 74 15007377 ns/op 20085855 B/op 60439 allocs/op +Benchmark_Targets_TypicalPipeline-8 75 15170948 ns/op 20085227 B/op 60436 allocs/op +Benchmark_Targets_TypicalPipeline-8 75 15316376 ns/op 20084762 B/op 60435 allocs/op +PASS +ok github.com/grafana/alloy/internal/component/discovery 13.170s diff --git a/internal/component/discovery/benchmark_data/04_typical_pipeline_cached_labels.txt b/internal/component/discovery/benchmark_data/04_typical_pipeline_cached_labels.txt new file mode 100644 index 0000000000..0686565aaf --- /dev/null +++ b/internal/component/discovery/benchmark_data/04_typical_pipeline_cached_labels.txt @@ -0,0 +1,12 @@ +goos: darwin +goarch: arm64 +pkg: github.com/grafana/alloy/internal/component/discovery +cpu: Apple M2 +Benchmark_Targets_TypicalPipeline-8 48 22740573 ns/op 21467362 B/op 100933 allocs/op +Benchmark_Targets_TypicalPipeline-8 45 23141183 ns/op 21466235 B/op 100932 allocs/op +Benchmark_Targets_TypicalPipeline-8 48 24125997 ns/op 21466766 B/op 100936 allocs/op +Benchmark_Targets_TypicalPipeline-8 46 22152172 ns/op 21469613 B/op 100944 allocs/op +Benchmark_Targets_TypicalPipeline-8 46 26163378 ns/op 21466312 B/op 100928 allocs/op +Benchmark_Targets_TypicalPipeline-8 38 41765575 ns/op 21539059 B/op 100944 allocs/op +PASS +ok github.com/grafana/alloy/internal/component/discovery 8.729s diff --git a/internal/component/discovery/benchmark_data/05_typical_pipeline_labels_wrappers.txt b/internal/component/discovery/benchmark_data/05_typical_pipeline_labels_wrappers.txt new file mode 100644 index 0000000000..e69de29bb2 diff --git a/internal/component/discovery/benchmark_data/06_typical_pipeline_grouping.txt b/internal/component/discovery/benchmark_data/06_typical_pipeline_grouping.txt new file mode 100644 index 0000000000..857b64907b --- /dev/null +++ b/internal/component/discovery/benchmark_data/06_typical_pipeline_grouping.txt @@ -0,0 +1,12 @@ +goos: darwin +goarch: arm64 +pkg: github.com/grafana/alloy/internal/component/discovery +cpu: Apple M2 +Benchmark_Targets_TypicalPipeline-8 33 35633138 ns/op 10840681 B/op 80539 allocs/op +Benchmark_Targets_TypicalPipeline-8 34 32671718 ns/op 10840274 B/op 80535 allocs/op +Benchmark_Targets_TypicalPipeline-8 37 33207543 ns/op 10927221 B/op 80541 allocs/op +Benchmark_Targets_TypicalPipeline-8 34 33105222 ns/op 10840144 B/op 80529 allocs/op +Benchmark_Targets_TypicalPipeline-8 36 32274262 ns/op 10927934 B/op 80541 allocs/op +Benchmark_Targets_TypicalPipeline-8 37 32763682 ns/op 10840785 B/op 80536 allocs/op +PASS +ok github.com/grafana/alloy/internal/component/discovery 8.826s diff --git a/internal/component/discovery/benchmark_data/07_typical_pipeline_cleanup.txt b/internal/component/discovery/benchmark_data/07_typical_pipeline_cleanup.txt new file mode 100644 index 0000000000..b48f3126a6 --- /dev/null +++ b/internal/component/discovery/benchmark_data/07_typical_pipeline_cleanup.txt @@ -0,0 +1,12 @@ +goos: darwin +goarch: arm64 +pkg: github.com/grafana/alloy/internal/component/discovery +cpu: Apple M2 +Benchmark_Targets_TypicalPipeline-8 40 29411972 ns/op 10927567 B/op 80537 allocs/op +Benchmark_Targets_TypicalPipeline-8 40 29447266 ns/op 10927386 B/op 80537 allocs/op +Benchmark_Targets_TypicalPipeline-8 40 29141625 ns/op 10840319 B/op 80536 allocs/op +Benchmark_Targets_TypicalPipeline-8 40 29083859 ns/op 10840952 B/op 80541 allocs/op +Benchmark_Targets_TypicalPipeline-8 40 36991749 ns/op 10927696 B/op 80541 allocs/op +Benchmark_Targets_TypicalPipeline-8 39 29443703 ns/op 10927869 B/op 80538 allocs/op +PASS +ok github.com/grafana/alloy/internal/component/discovery 9.926s diff --git a/internal/component/discovery/discovery.go b/internal/component/discovery/discovery.go index 03670814e1..9be7520312 100644 --- a/internal/component/discovery/discovery.go +++ b/internal/component/discovery/discovery.go @@ -3,58 +3,17 @@ package discovery import ( "context" "fmt" - "slices" - "sort" - "strings" "sync" "time" - "github.com/prometheus/common/model" "github.com/prometheus/prometheus/discovery" "github.com/prometheus/prometheus/discovery/targetgroup" - "github.com/prometheus/prometheus/model/labels" "github.com/grafana/alloy/internal/component" "github.com/grafana/alloy/internal/runtime/logging/level" "github.com/grafana/alloy/internal/service/livedebugging" ) -// Target refers to a singular discovered endpoint found by a discovery -// component. -type Target map[string]string - -// Labels converts Target into a set of sorted labels. -func (t Target) Labels() labels.Labels { - var lset labels.Labels - for k, v := range t { - lset = append(lset, labels.Label{Name: k, Value: v}) - } - sort.Sort(lset) - return lset -} - -func (t Target) NonMetaLabels() labels.Labels { - var lset labels.Labels - for k, v := range t { - if !strings.HasPrefix(k, model.MetaLabelPrefix) { - lset = append(lset, labels.Label{Name: k, Value: v}) - } - } - sort.Sort(lset) - return lset -} - -func (t Target) SpecificLabels(lbls []string) labels.Labels { - var lset labels.Labels - for k, v := range t { - if slices.Contains(lbls, k) { - lset = append(lset, labels.Label{Name: k, Value: v}) - } - } - sort.Sort(lset) - return lset -} - // Exports holds values which are exported by all discovery components. type Exports struct { Targets []Target `alloy:"targets,attr"` @@ -269,17 +228,7 @@ func toAlloyTargets(cache map[string]*targetgroup.Group) []Target { for _, group := range cache { for _, target := range group.Targets { - tLabels := make(map[string]string, len(group.Labels)+len(target)) - - // first add the group labels, and then the - // target labels, so that target labels take precedence. - for k, v := range group.Labels { - tLabels[string(k)] = string(v) - } - for k, v := range target { - tLabels[string(k)] = string(v) - } - allTargets = append(allTargets, tLabels) + allTargets = append(allTargets, NewTargetFromSpecificAndBaseLabelSet(target, group.Labels)) } } return allTargets diff --git a/internal/component/discovery/discovery_test.go b/internal/component/discovery/discovery_test.go index 71927dba95..407ff5d7ef 100644 --- a/internal/component/discovery/discovery_test.go +++ b/internal/component/discovery/discovery_test.go @@ -16,6 +16,7 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/runtime/equality" "github.com/grafana/alloy/internal/service/livedebugging" ) @@ -40,21 +41,21 @@ var updateTestCases = []discovererUpdateTestCase{ {Source: "test", Labels: model.LabelSet{"test_key": "value"}, Targets: []model.LabelSet{{"foo": "bar"}}}, }, expectedInitialExports: []component.Exports{ - Exports{Targets: []Target{{"foo": "bar", "test_key": "value"}}}, // Initial export + Exports{Targets: []Target{NewTargetFromMap(map[string]string{"foo": "bar", "test_key": "value"})}}, // Initial export }, updatedTargets: []*targetgroup.Group{ {Source: "test", Labels: model.LabelSet{"test_key_2": "value"}, Targets: []model.LabelSet{{"baz": "bux"}}}, }, expectedUpdatedExports: []component.Exports{ - Exports{Targets: []Target{{"foo": "bar", "test_key": "value"}}}, // Initial export - Exports{Targets: []Target{{"foo": "bar", "test_key": "value"}}}, // Initial re-published on shutdown - Exports{Targets: []Target{{"test_key_2": "value", "baz": "bux"}}}, // Updated export + Exports{Targets: []Target{NewTargetFromMap(map[string]string{"foo": "bar", "test_key": "value"})}}, // Initial export + Exports{Targets: []Target{NewTargetFromMap(map[string]string{"foo": "bar", "test_key": "value"})}}, // Initial re-published on shutdown + Exports{Targets: []Target{NewTargetFromMap(map[string]string{"test_key_2": "value", "baz": "bux"})}}, // Updated export }, expectedFinalExports: []component.Exports{ - Exports{Targets: []Target{{"foo": "bar", "test_key": "value"}}}, // Initial export - Exports{Targets: []Target{{"foo": "bar", "test_key": "value"}}}, // Initial re-published on shutdown - Exports{Targets: []Target{{"test_key_2": "value", "baz": "bux"}}}, // Updated export - Exports{Targets: []Target{{"test_key_2": "value", "baz": "bux"}}}, // Updated re-published on shutdown + Exports{Targets: []Target{NewTargetFromMap(map[string]string{"foo": "bar", "test_key": "value"})}}, // Initial export + Exports{Targets: []Target{NewTargetFromMap(map[string]string{"foo": "bar", "test_key": "value"})}}, // Initial re-published on shutdown + Exports{Targets: []Target{NewTargetFromMap(map[string]string{"test_key_2": "value", "baz": "bux"})}}, // Updated export + Exports{Targets: []Target{NewTargetFromMap(map[string]string{"test_key_2": "value", "baz": "bux"})}}, // Updated re-published on shutdown }, }, { @@ -86,15 +87,15 @@ var updateTestCases = []discovererUpdateTestCase{ {Source: "test", Labels: model.LabelSet{"test_key_2": "value"}, Targets: []model.LabelSet{{"baz": "bux"}}}, }, expectedUpdatedExports: []component.Exports{ - Exports{Targets: []Target{}}, // Initial publish - Exports{Targets: []Target{}}, // Initial re-published on shutdown - Exports{Targets: []Target{{"test_key_2": "value", "baz": "bux"}}}, // Updated export. + Exports{Targets: []Target{}}, // Initial publish + Exports{Targets: []Target{}}, // Initial re-published on shutdown + Exports{Targets: []Target{NewTargetFromMap(map[string]string{"test_key_2": "value", "baz": "bux"})}}, // Updated export. }, expectedFinalExports: []component.Exports{ - Exports{Targets: []Target{}}, // Initial publish - Exports{Targets: []Target{}}, // Initial re-published on shutdown - Exports{Targets: []Target{{"test_key_2": "value", "baz": "bux"}}}, // Updated export. - Exports{Targets: []Target{{"test_key_2": "value", "baz": "bux"}}}, // Updated export re-published on shutdown. + Exports{Targets: []Target{}}, // Initial publish + Exports{Targets: []Target{}}, // Initial re-published on shutdown + Exports{Targets: []Target{NewTargetFromMap(map[string]string{"test_key_2": "value", "baz": "bux"})}}, // Updated export. + Exports{Targets: []Target{NewTargetFromMap(map[string]string{"test_key_2": "value", "baz": "bux"})}}, // Updated export re-published on shutdown. }, }, { @@ -103,19 +104,19 @@ var updateTestCases = []discovererUpdateTestCase{ {Source: "test", Labels: model.LabelSet{"test_key": "value"}, Targets: []model.LabelSet{{"foo": "bar"}}}, }, expectedInitialExports: []component.Exports{ - Exports{Targets: []Target{{"foo": "bar", "test_key": "value"}}}, // Initial export + Exports{Targets: []Target{NewTargetFromMap(map[string]string{"foo": "bar", "test_key": "value"})}}, // Initial export }, updatedTargets: nil, expectedUpdatedExports: []component.Exports{ - Exports{Targets: []Target{{"foo": "bar", "test_key": "value"}}}, // Initial export - Exports{Targets: []Target{{"foo": "bar", "test_key": "value"}}}, // Initial re-published on shutdown - Exports{Targets: []Target{}}, // Updated export should publish empty! + Exports{Targets: []Target{NewTargetFromMap(map[string]string{"foo": "bar", "test_key": "value"})}}, // Initial export + Exports{Targets: []Target{NewTargetFromMap(map[string]string{"foo": "bar", "test_key": "value"})}}, // Initial re-published on shutdown + Exports{Targets: []Target{}}, // Updated export should publish empty! }, expectedFinalExports: []component.Exports{ - Exports{Targets: []Target{{"foo": "bar", "test_key": "value"}}}, // Initial export - Exports{Targets: []Target{{"foo": "bar", "test_key": "value"}}}, // Initial re-published on shutdown - Exports{Targets: []Target{}}, // Updated export should publish empty! - Exports{Targets: []Target{}}, // Updated re-published on shutdown + Exports{Targets: []Target{NewTargetFromMap(map[string]string{"foo": "bar", "test_key": "value"})}}, // Initial export + Exports{Targets: []Target{NewTargetFromMap(map[string]string{"foo": "bar", "test_key": "value"})}}, // Initial re-published on shutdown + Exports{Targets: []Target{}}, // Updated export should publish empty! + Exports{Targets: []Target{}}, // Updated re-published on shutdown }, }, } @@ -175,7 +176,7 @@ func TestDiscoveryUpdates(t *testing.T) { require.EventuallyWithT(t, func(t *assert.CollectT) { publishedExportsMut.Lock() defer publishedExportsMut.Unlock() - assert.Equal(t, tc.expectedInitialExports, publishedExports) + assertExportsEqual(t, tc.expectedInitialExports, publishedExports) }, 3*time.Second, time.Millisecond) discoverer = newFakeDiscoverer() @@ -188,7 +189,7 @@ func TestDiscoveryUpdates(t *testing.T) { require.EventuallyWithT(t, func(t *assert.CollectT) { publishedExportsMut.Lock() defer publishedExportsMut.Unlock() - assert.Equal(t, tc.expectedUpdatedExports, publishedExports) + assertExportsEqual(t, tc.expectedUpdatedExports, publishedExports) }, 3*time.Second, time.Millisecond) ctxCancel() @@ -197,12 +198,24 @@ func TestDiscoveryUpdates(t *testing.T) { require.EventuallyWithT(t, func(t *assert.CollectT) { publishedExportsMut.Lock() defer publishedExportsMut.Unlock() - assert.Equal(t, tc.expectedFinalExports, publishedExports) + assertExportsEqual(t, tc.expectedFinalExports, publishedExports) }, 3*time.Second, time.Millisecond) }) } } +func assertExportsEqual(t *assert.CollectT, expected []component.Exports, actual []component.Exports) { + if actual == nil { + assert.NotNil(t, actual) + return + } + equal := equality.DeepEqual(expected, actual) + assert.True(t, equal, "expected and actual exports are different") + if !equal { // also do assert.Equal to get a nice diff view if there is an issue. + assert.Equal(t, expected, actual) + } +} + /* on darwin/arm64/Apple M2: Benchmark_ToAlloyTargets-8 150 7549967 ns/op 12768249 B/op 40433 allocs/op diff --git a/internal/component/discovery/distributed_targets.go b/internal/component/discovery/distributed_targets.go index 5e776f6b40..d4dc858011 100644 --- a/internal/component/discovery/distributed_targets.go +++ b/internal/component/discovery/distributed_targets.go @@ -99,11 +99,11 @@ func (dt *DistributedTargets) MovedToRemoteInstance(prev *DistributedTargets) [] } func keyFor(tgt Target) shard.Key { - return shard.Key(tgt.NonMetaLabels().Hash()) + return shard.Key(tgt.NonMetaLabelsHash()) } func keyForLabels(tgt Target, lbls []string) shard.Key { - return shard.Key(tgt.SpecificLabels(lbls).Hash()) + return shard.Key(tgt.SpecificLabelsHash(lbls)) } type disabledCluster struct{} diff --git a/internal/component/discovery/distributed_targets_test.go b/internal/component/discovery/distributed_targets_test.go index fb810859aa..bac2fae574 100644 --- a/internal/component/discovery/distributed_targets_test.go +++ b/internal/component/discovery/distributed_targets_test.go @@ -285,11 +285,11 @@ func BenchmarkDistributedTargets(b *testing.B) { } func mkTarget(kv ...string) Target { - target := make(Target) + target := make(map[string]string) for i := 0; i < len(kv); i += 2 { target[kv[i]] = kv[i+1] } - return target + return NewTargetFromMap(target) } func testDistTargets(lookupMap map[shard.Key][]peer.Peer) *DistributedTargets { diff --git a/internal/component/discovery/process/container.go b/internal/component/discovery/process/container.go index 1ac7842818..62e56af7c8 100644 --- a/internal/component/discovery/process/container.go +++ b/internal/component/discovery/process/container.go @@ -42,15 +42,15 @@ func getContainerIDFromK8S(k8sContainerID string) string { } func getContainerIDFromTarget(target discovery.Target) string { - cid, ok := target[labelProcessContainerID] + cid, ok := target.Get(labelProcessContainerID) if ok && cid != "" { return cid } - cid, ok = target["__meta_kubernetes_pod_container_id"] + cid, ok = target.Get("__meta_kubernetes_pod_container_id") if ok && cid != "" { return getContainerIDFromK8S(cid) } - cid, ok = target["__meta_docker_container_id"] + cid, ok = target.Get("__meta_docker_container_id") if ok && cid != "" { return cid } diff --git a/internal/component/discovery/process/discover.go b/internal/component/discovery/process/discover.go index 388c4f5823..c740af28b5 100644 --- a/internal/component/discovery/process/discover.go +++ b/internal/component/discovery/process/discover.go @@ -11,9 +11,10 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" - "github.com/grafana/alloy/internal/component/discovery" gopsutil "github.com/shirou/gopsutil/v3/process" "golang.org/x/sys/unix" + + "github.com/grafana/alloy/internal/component/discovery" ) const ( @@ -52,7 +53,7 @@ func convertProcesses(ps []process) []discovery.Target { } func convertProcess(p process) discovery.Target { - t := make(discovery.Target, 8) + t := make(map[string]string, 8) t[labelProcessID] = p.pid if p.exe != "" { t[labelProcessExe] = p.exe @@ -75,7 +76,7 @@ func convertProcess(p process) discovery.Target { if p.cgroupPath != "" { t[labelProcessCgroupPath] = p.cgroupPath } - return t + return discovery.NewTargetFromMap(t) } func discover(l log.Logger, cfg *DiscoverConfig) ([]process, error) { diff --git a/internal/component/discovery/process/join.go b/internal/component/discovery/process/join.go index 5c2728cc6e..5643d4be9d 100644 --- a/internal/component/discovery/process/join.go +++ b/internal/component/discovery/process/join.go @@ -2,7 +2,9 @@ package process -import "github.com/grafana/alloy/internal/component/discovery" +import ( + "github.com/grafana/alloy/internal/component/discovery" +) func join(processes, containers []discovery.Target) []discovery.Target { res := make([]discovery.Target, 0, len(processes)+len(containers)) @@ -27,14 +29,9 @@ func join(processes, containers []discovery.Target) []discovery.Target { res = append(res, p) continue } - mergedTarget := make(discovery.Target, len(p)+len(container)) - for k, v := range p { - mergedTarget[k] = v - } - for k, v := range container { - mergedTarget[k] = v - } - res = append(res, mergedTarget) + mergedBuilder := discovery.NewTargetBuilderFrom(p) + mergedBuilder.MergeWith(container) + res = append(res, mergedBuilder.Target()) } for _, target := range cid2container { res = append(res, target) diff --git a/internal/component/discovery/process/join_test.go b/internal/component/discovery/process/join_test.go index 3f2e9dfa5a..2aedd1c445 100644 --- a/internal/component/discovery/process/join_test.go +++ b/internal/component/discovery/process/join_test.go @@ -6,8 +6,9 @@ import ( "fmt" "testing" - "github.com/grafana/alloy/internal/component/discovery" "github.com/stretchr/testify/assert" + + "github.com/grafana/alloy/internal/component/discovery" ) func TestJoin(t *testing.T) { @@ -37,50 +38,50 @@ func TestJoin(t *testing.T) { containerID: "", }), }, []discovery.Target{ - { + discovery.NewTargetFromMap(map[string]string{ "__meta_docker_container_id": "7edda1de1e0d1d366351e478359cf5fa16bb8ab53063a99bb119e56971bfb7e2", "foo": "bar", - }, - { + }), + discovery.NewTargetFromMap(map[string]string{ "__meta_kubernetes_pod_container_id": "docker://47e320f795efcec1ecf2001c3a09c95e3701ed87de8256837b70b10e23818251", "qwe": "asd", - }, - { + }), + discovery.NewTargetFromMap(map[string]string{ "lol": "lol", - }, + }), }, []discovery.Target{ - { + discovery.NewTargetFromMap(map[string]string{ "__process_pid__": "239", "__meta_process_exe": "/bin/foo", "__meta_process_cwd": "/", "__container_id__": "7edda1de1e0d1d366351e478359cf5fa16bb8ab53063a99bb119e56971bfb7e2", "__meta_docker_container_id": "7edda1de1e0d1d366351e478359cf5fa16bb8ab53063a99bb119e56971bfb7e2", "foo": "bar", - }, - { + }), + discovery.NewTargetFromMap(map[string]string{ "__process_pid__": "240", "__meta_process_exe": "/bin/bar", "__meta_process_cwd": "/tmp", "__container_id__": "7edda1de1e0d1d366351e478359cf5fa16bb8ab53063a99bb119e56971bfb7e2", "__meta_docker_container_id": "7edda1de1e0d1d366351e478359cf5fa16bb8ab53063a99bb119e56971bfb7e2", "foo": "bar", - }, - { + }), + discovery.NewTargetFromMap(map[string]string{ "__meta_docker_container_id": "7edda1de1e0d1d366351e478359cf5fa16bb8ab53063a99bb119e56971bfb7e2", "foo": "bar", - }, - { + }), + discovery.NewTargetFromMap(map[string]string{ "__process_pid__": "241", "__meta_process_exe": "/bin/bash", "__meta_process_cwd": "/opt", - }, - { + }), + discovery.NewTargetFromMap(map[string]string{ "__meta_kubernetes_pod_container_id": "docker://47e320f795efcec1ecf2001c3a09c95e3701ed87de8256837b70b10e23818251", "qwe": "asd", - }, - { + }), + discovery.NewTargetFromMap(map[string]string{ "lol": "lol", - }, + }), }, }, { diff --git a/internal/component/discovery/relabel/relabel.go b/internal/component/discovery/relabel/relabel.go index 6613c6c801..4d2d545231 100644 --- a/internal/component/discovery/relabel/relabel.go +++ b/internal/component/discovery/relabel/relabel.go @@ -10,8 +10,6 @@ import ( "github.com/grafana/alloy/internal/component/discovery" "github.com/grafana/alloy/internal/featuregate" "github.com/grafana/alloy/internal/service/livedebugging" - "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/model/relabel" ) func init() { @@ -47,7 +45,6 @@ type Component struct { opts component.Options mut sync.RWMutex - rcs []*relabel.Config debugDataPublisher livedebugging.DebugDataPublisher } @@ -82,24 +79,23 @@ func (c *Component) Run(ctx context.Context) error { // Update implements component.Component. func (c *Component) Update(args component.Arguments) error { - c.mut.Lock() - defer c.mut.Unlock() - newArgs := args.(Arguments) targets := make([]discovery.Target, 0, len(newArgs.Targets)) - relabelConfigs := alloy_relabel.ComponentToPromRelabelConfigs(newArgs.RelabelConfigs) - c.rcs = relabelConfigs for _, t := range newArgs.Targets { - lset := componentMapToPromLabels(t) - relabelled, keep := relabel.Process(lset, relabelConfigs...) + var ( + relabelled discovery.Target + builder = discovery.NewTargetBuilderFrom(t) + keep = alloy_relabel.ProcessBuilder(builder, newArgs.RelabelConfigs...) + ) if keep { - targets = append(targets, promLabelsToComponent(relabelled)) + relabelled = builder.Target() + targets = append(targets, relabelled) } componentID := livedebugging.ComponentID(c.opts.ID) if c.debugDataPublisher.IsActive(componentID) { - c.debugDataPublisher.Publish(componentID, fmt.Sprintf("%s => %s", lset.String(), relabelled.String())) + c.debugDataPublisher.Publish(componentID, fmt.Sprintf("%s => %s", t, relabelled)) } } @@ -112,21 +108,3 @@ func (c *Component) Update(args component.Arguments) error { } func (c *Component) LiveDebugging(_ int) {} - -func componentMapToPromLabels(ls discovery.Target) labels.Labels { - res := make([]labels.Label, 0, len(ls)) - for k, v := range ls { - res = append(res, labels.Label{Name: k, Value: v}) - } - - return res -} - -func promLabelsToComponent(ls labels.Labels) discovery.Target { - res := make(map[string]string, len(ls)) - for _, l := range ls { - res[l.Name] = l.Value - } - - return res -} diff --git a/internal/component/discovery/relabel/relabel_test.go b/internal/component/discovery/relabel/relabel_test.go index 6055fe05e0..920f5af4d4 100644 --- a/internal/component/discovery/relabel/relabel_test.go +++ b/internal/component/discovery/relabel/relabel_test.go @@ -4,12 +4,13 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + alloy_relabel "github.com/grafana/alloy/internal/component/common/relabel" "github.com/grafana/alloy/internal/component/discovery" "github.com/grafana/alloy/internal/component/discovery/relabel" "github.com/grafana/alloy/internal/runtime/componenttest" "github.com/grafana/alloy/syntax" - "github.com/stretchr/testify/require" ) func TestRelabelConfigApplication(t *testing.T) { @@ -56,7 +57,7 @@ rule { } ` expectedOutput := []discovery.Target{ - map[string]string{"__address__": "localhost", "app": "backend", "destination": "localhost/one", "meta_bar": "bar", "meta_foo": "foo", "name": "one"}, + discovery.NewTargetFromMap(map[string]string{"__address__": "localhost", "app": "backend", "destination": "localhost/one", "meta_bar": "bar", "meta_foo": "foo", "name": "one"}), } var args relabel.Arguments diff --git a/internal/component/discovery/target.go b/internal/component/discovery/target.go new file mode 100644 index 0000000000..40f42ab28a --- /dev/null +++ b/internal/component/discovery/target.go @@ -0,0 +1,331 @@ +package discovery + +import ( + "fmt" + "slices" + "strings" + + "github.com/cespare/xxhash/v2" + commonlabels "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/discovery/targetgroup" + modellabels "github.com/prometheus/prometheus/model/labels" + + "github.com/grafana/alloy/internal/runtime/equality" + "github.com/grafana/alloy/syntax" +) + +type Target struct { + group commonlabels.LabelSet + own commonlabels.LabelSet + size int +} + +var ( + seps = []byte{'\xff'} + + _ syntax.Capsule = Target{} + _ syntax.ConvertibleIntoCapsule = Target{} + _ syntax.ConvertibleFromCapsule = &Target{} + _ equality.CustomEquality = Target{} +) + +func ComponentTargetsToPromTargetGroups(jobName string, tgs []Target) map[string][]*targetgroup.Group { + targetsWithCommonGroupLabels := map[uint64][]Target{} + for _, t := range tgs { + fp := t.groupLabelsHash() // TODO(thampiotr): could use a cache if it's on exactly the same slice + targetsWithCommonGroupLabels[fp] = append(targetsWithCommonGroupLabels[fp], t) + } + + allGroups := make([]*targetgroup.Group, 0, len(targetsWithCommonGroupLabels)) + + groupIndex := 0 + for _, targetsInGroup := range targetsWithCommonGroupLabels { + sharedLabels := targetsInGroup[0].group // all have the same group labels. + individualLabels := make([]commonlabels.LabelSet, len(targetsInGroup)) + for i, target := range targetsInGroup { + individualLabels[i] = target.own + } + promGroup := &targetgroup.Group{ + Source: fmt.Sprintf("%s_part_%d", jobName, groupIndex), + Labels: sharedLabels, + Targets: individualLabels, + } + allGroups = append(allGroups, promGroup) + } + return map[string][]*targetgroup.Group{jobName: allGroups} +} + +var EmptyTarget = Target{ + group: commonlabels.LabelSet{}, + own: commonlabels.LabelSet{}, + size: 0, +} + +func NewTargetFromLabelSet(ls commonlabels.LabelSet) Target { + return NewTargetFromSpecificAndBaseLabelSet(ls, nil) +} + +func NewTargetFromSpecificAndBaseLabelSet(own, group commonlabels.LabelSet) Target { + if group == nil { + group = commonlabels.LabelSet{} + } + if own == nil { + own = commonlabels.LabelSet{} + } + ret := Target{ + group: group, + own: own, + } + size := 0 + ret.ForEachLabel(func(key string, value string) bool { + size++ + return true + }) + ret.size = size + return ret +} + +// NewTargetFromModelLabels creates a target from model Labels. +// NOTE: this is not optimised and should be avoided on a hot path. +func NewTargetFromModelLabels(labels modellabels.Labels) Target { + l := make(commonlabels.LabelSet, len(labels)) + for _, label := range labels { + l[commonlabels.LabelName(label.Name)] = commonlabels.LabelValue(label.Value) + } + return NewTargetFromLabelSet(l) +} + +func NewTargetFromMap(m map[string]string) Target { + l := make(commonlabels.LabelSet, len(m)) + for k, v := range m { + l[commonlabels.LabelName(k)] = commonlabels.LabelValue(v) + } + return NewTargetFromLabelSet(l) +} + +// Labels converts this target into prometheus/prometheus/model/labels.Labels. It is not efficient and should be +// avoided on a hot path. +func (t Target) Labels() modellabels.Labels { + // This method allocates less than Builder or ScratchBuilder, as proven by benchmarks. + lb := make([]modellabels.Label, 0, t.Len()) + t.ForEachLabel(func(key string, value string) bool { + lb = append(lb, modellabels.Label{ + Name: key, + Value: value, + }) + return true + }) + slices.SortFunc(lb, func(a, b modellabels.Label) int { return strings.Compare(a.Name, b.Name) }) + return lb +} + +func (t Target) NonReservedLabelSet() commonlabels.LabelSet { + // This may not be the most optimal way, but this method is NOT a known hot spot at the time of this comment. + result := make(commonlabels.LabelSet, t.Len()) + t.ForEachLabel(func(key string, value string) bool { + if !strings.HasPrefix(key, commonlabels.ReservedLabelPrefix) { + result[commonlabels.LabelName(key)] = commonlabels.LabelValue(value) + } + return true + }) + return result +} + +// ForEachLabel runs f over each key value pair in the Target. f must not modify Target while iterating. If f returns +// false, the iteration is interrupted. If f returns true, the iteration continues until the last element. ForEachLabel +// returns true if all the labels were iterated over or false if any call to f has interrupted the iteration. +func (t Target) ForEachLabel(f func(key string, value string) bool) bool { + for k, v := range t.own { + if !f(string(k), string(v)) { + // f has returned false, interrupt the iteration and return false. + return false + } + } + // Now go over the group ones only if they were not part of own labels + for k, v := range t.group { + if _, ok := t.own[k]; ok { + continue + } + if !f(string(k), string(v)) { + // f has returned false, interrupt the iteration and return false. + return false + } + } + + // We finished the iteration, return true. + return true +} + +// AsMap returns target's labels as a map of strings. +// Deprecated: this should not be used on any hot path as it leads to more allocation. +func (t Target) AsMap() map[string]string { + ret := make(map[string]string, t.Len()) + t.ForEachLabel(func(key string, value string) bool { + ret[key] = value + return true + }) + return ret +} + +func (t Target) Get(key string) (string, bool) { + lv, ok := t.own[commonlabels.LabelName(key)] + if ok { + return string(lv), ok + } + lv, ok = t.group[commonlabels.LabelName(key)] + return string(lv), ok +} + +// LabelSet converts this target in to a LabelSet +// Deprecated: this is not optimised and should be avoided if possible. +func (t Target) LabelSet() commonlabels.LabelSet { + merged := make(commonlabels.LabelSet, t.Len()) + for k, v := range t.group { + merged[k] = v + } + for k, v := range t.own { + merged[k] = v + } + return merged +} + +func (t Target) Len() int { + return t.size +} + +// AlloyCapsule marks FastTarget as a capsule so Alloy syntax can marshal to or from it. +func (t Target) AlloyCapsule() {} + +// ConvertInto is called by Alloy syntax to try converting Target to another type. +func (t Target) ConvertInto(dst interface{}) error { + switch dst := dst.(type) { + case *map[string]syntax.Value: + result := make(map[string]syntax.Value, t.Len()) + t.ForEachLabel(func(key string, value string) bool { + result[key] = syntax.ValueFromString(value) + return true + }) + *dst = result + return nil + } + return fmt.Errorf("target::ConvertInto: conversion to '%T' is not supported", dst) +} + +// ConvertFrom is called by Alloy syntax to try converting from another type to Target. +func (t *Target) ConvertFrom(src interface{}) error { + switch src := src.(type) { + case map[string]syntax.Value: + labelSet := make(commonlabels.LabelSet, len(src)) + for k, v := range src { + if !v.IsString() { + return fmt.Errorf("target::ConvertFrom: cannot convert non-string values to labels") + } + labelSet[commonlabels.LabelName(k)] = commonlabels.LabelValue(v.Text()) + } + *t = NewTargetFromLabelSet(labelSet) + return nil + } + return fmt.Errorf("target: conversion from '%T' is not supported", src) +} + +func (t Target) String() string { + s := make([]string, 0, t.Len()) + t.ForEachLabel(func(key string, value string) bool { + s = append(s, fmt.Sprintf("%q=%q", key, value)) + return true + }) + slices.Sort(s) + return fmt.Sprintf("{%s}", strings.Join(s, ", ")) +} + +func (t Target) Equals(other any) bool { + if ot, ok := other.(Target); ok { + return t.EqualsTarget(ot) + } + return false +} + +func (t Target) EqualsTarget(other Target) bool { + if t.Len() != other.Len() { + return false + } + finished := t.ForEachLabel(func(key string, value string) bool { + otherValue, ok := other.Get(key) + if !ok || otherValue != value { + return false + } + return true + }) + return finished +} + +func (t Target) NonMetaLabelsHash() uint64 { + return t.HashLabelsWithPredicate(func(key string) bool { + return !strings.HasPrefix(key, commonlabels.MetaLabelPrefix) + }) +} + +func (t Target) SpecificLabelsHash(labelNames []string) uint64 { + return t.HashLabelsWithPredicate(func(key string) bool { + return slices.Contains(labelNames, key) + }) +} + +func (t Target) HashLabelsWithPredicate(pred func(key string) bool) uint64 { + // For hash to be deterministic, we need labels order to be deterministic too. Figure this out first. + labelsInOrder := make([]string, 0, t.Len()) // TODO(thampiotr): this can go to object pool? + t.ForEachLabel(func(key string, value string) bool { + if pred(value) { + labelsInOrder = append(labelsInOrder, key) + } + return true + }) + slices.Sort(labelsInOrder) + return t.hashLabelsInOrder(labelsInOrder) +} + +func (t Target) groupLabelsHash() uint64 { + // For hash to be deterministic, we need labels order to be deterministic too. Figure this out first. + // TODO(thampiotr): We could cache the hash somewhere if it is called often on the same data. + labelsInOrder := make([]string, 0, len(t.group)) // TODO(thampiotr): this can go to object pool? + for name := range t.group { + labelsInOrder = append(labelsInOrder, string(name)) + } + slices.Sort(labelsInOrder) + return t.hashLabelsInOrder(labelsInOrder) +} + +func (t Target) hashLabelsInOrder(order []string) uint64 { + // This optimisation is adapted from prometheus/model/labels. + // Use xxhash.Sum64(b) for fast path as it's faster. + b := make([]byte, 0, 1024) + mustGet := func(label string) string { + val, ok := t.Get(label) + if !ok { + panic("label concurrently modified - this is a bug - please report an issue") + } + return val + } + + for i, key := range order { + value := mustGet(key) + if len(b)+len(key)+len(value)+2 >= cap(b) { + // If labels entry is 1KB+ do not allocate whole entry. + h := xxhash.New() + _, _ = h.Write(b) + for _, key := range order[i:] { + _, _ = h.WriteString(key) + _, _ = h.Write(seps) + _, _ = h.WriteString(mustGet(key)) + _, _ = h.Write(seps) + } + return h.Sum64() + } + + b = append(b, key...) + b = append(b, seps[0]) + b = append(b, value...) + b = append(b, seps[0]) + } + return xxhash.Sum64(b) +} diff --git a/internal/component/discovery/target_builder.go b/internal/component/discovery/target_builder.go new file mode 100644 index 0000000000..f3acfa783a --- /dev/null +++ b/internal/component/discovery/target_builder.go @@ -0,0 +1,171 @@ +package discovery + +import ( + commonlabels "github.com/prometheus/common/model" + + "github.com/grafana/alloy/internal/component/common/relabel" +) + +const initialLabelsBuilderCapacity = 5 + +type TargetBuilder interface { + relabel.LabelBuilder + Target() Target + SetKV(kv ...string) TargetBuilder + MergeWith(Target) TargetBuilder +} + +// TODO(thampiotr): maybe all targets can have toDel map ready? +type targetBuilder struct { + group commonlabels.LabelSet + own commonlabels.LabelSet + + toAdd map[string]string + toDel map[string]struct{} +} + +func (t targetBuilder) SetKV(kv ...string) TargetBuilder { + for i := 0; i < len(kv); i += 2 { + t.Set(kv[i], kv[i+1]) + } + return t +} + +func (t targetBuilder) MergeWith(target Target) TargetBuilder { + // Not on a hot path, so doesn't really need to be optimised. + target.ForEachLabel(func(key string, value string) bool { + t.Set(key, value) + return true + }) + return t +} + +// NewTargetBuilder creates an empty labels builder. +func NewTargetBuilder() TargetBuilder { + return targetBuilder{ + group: nil, + own: make(commonlabels.LabelSet), + toAdd: make(map[string]string), + toDel: make(map[string]struct{}), + } +} + +func NewTargetBuilderFrom(t Target) TargetBuilder { + return NewTargetBuilderFromLabelSets(t.group, t.own) +} + +func NewTargetBuilderFromLabelSets(group, own commonlabels.LabelSet) TargetBuilder { + return targetBuilder{ + group: group, + own: own, + // TODO(thampiotr): we could postulate that builder is throw-away after .Target() and use object pool for these. + toAdd: make(map[string]string, initialLabelsBuilderCapacity), + toDel: make(map[string]struct{}, initialLabelsBuilderCapacity), + } +} + +func (t targetBuilder) Get(label string) string { + if v, ok := t.toAdd[label]; ok { + return v + } + if _, ok := t.toDel[label]; ok { + return "" + } + lv, ok := t.own[commonlabels.LabelName(label)] + if ok { + return string(lv) + } + lv, ok = t.group[commonlabels.LabelName(label)] + return string(lv) +} + +func (t targetBuilder) Range(f func(label string, value string)) { + for k, v := range t.toAdd { + f(k, v) + } + for k, v := range t.own { + if _, deleted := t.toDel[string(k)]; deleted { + continue + } + f(string(k), string(v)) + } + for k, v := range t.group { + if _, deleted := t.toDel[string(k)]; deleted { + continue + } + if _, inOwn := t.own[k]; inOwn { + continue + } + f(string(k), string(v)) + } +} + +func (t targetBuilder) Set(label string, val string) { + t.toAdd[label] = val +} + +func (t targetBuilder) Del(labels ...string) { + for _, label := range labels { + t.toDel[label] = struct{}{} + } +} + +// TODO(thampiotr): this can be more optimal still... +func (t targetBuilder) Target() Target { + if len(t.toAdd) == 0 && len(t.toDel) == 0 { + return NewTargetFromSpecificAndBaseLabelSet(t.own, t.group) + } + // Figure out if we need to modify own set + modifyOwn := false + if len(t.toAdd) > 0 { // if there is anything to add + modifyOwn = true + } else { + for label := range t.toDel { // if there is anything to delete + if _, ok := t.own[commonlabels.LabelName(label)]; ok { + modifyOwn = true + break + } + } + } + + modifyGroup := false + for label := range t.toDel { // if there is anything to delete from group + if _, ok := t.group[commonlabels.LabelName(label)]; ok { + modifyGroup = true + break + } + } + + var ( + newOwn = t.own + newGroup = t.group + ) + + if modifyOwn { + // TODO(thampiotr): further benchmarking is needed here, but if this is causing a lot of allocation, we could + // try implementing Target with toAdd and toDel overlays instead. + newOwn = make(commonlabels.LabelSet, len(t.own)+len(t.toAdd)) + for k, v := range t.own { + if _, ok := t.toDel[string(k)]; ok { + continue + } + newOwn[k] = v + } + for k, v := range t.toAdd { + newOwn[commonlabels.LabelName(k)] = commonlabels.LabelValue(v) + } + } + if modifyGroup { + // TODO(thampiotr): When relabeling a lot of targets we possibly will produce a lot of t.groups that will + // all be the same. So maybe we need a step to consolidate them? + newGroup = make(commonlabels.LabelSet, len(t.group)) + for k, v := range t.group { + if _, ok := t.toDel[string(k)]; ok { + continue + } + newGroup[k] = v + } + } + + return NewTargetFromSpecificAndBaseLabelSet(newOwn, newGroup) +} diff --git a/internal/component/discovery/target_test.go b/internal/component/discovery/target_test.go new file mode 100644 index 0000000000..31bf22f506 --- /dev/null +++ b/internal/component/discovery/target_test.go @@ -0,0 +1,207 @@ +package discovery + +import ( + "fmt" + "reflect" + "slices" + "testing" + + "github.com/Masterminds/goutils" + "github.com/grafana/ckit/peer" + "github.com/grafana/ckit/shard" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/discovery/targetgroup" + "github.com/stretchr/testify/require" + + "github.com/grafana/alloy/internal/runtime/equality" + "github.com/grafana/alloy/syntax/parser" + "github.com/grafana/alloy/syntax/vm" +) + +func TestDecodeMap(t *testing.T) { + scope := vm.NewScope(map[string]interface{}{ + "foobar": 42, + }) + + input := `{ a = "5", b = "10" }` + expected := NewTargetFromMap(map[string]string{"a": "5", "b": "10"}) + + expr, err := parser.ParseExpression(input) + require.NoError(t, err) + + eval := vm.New(expr) + actual := Target{} + require.NoError(t, eval.Evaluate(scope, &actual)) + require.Equal(t, expected, actual) + + // Test can iterate over it + var seen []string + actual.ForEachLabel(func(k string, v string) bool { + seen = append(seen, fmt.Sprintf("%s=%s", k, v)) + return true + }) + slices.Sort(seen) + require.Equal(t, []string{"a=5", "b=10"}, seen) + + // Some loggers print targets out, check it's all good. + require.Equal(t, `{"a"="5", "b"="10"}`, fmt.Sprintf("%s", actual)) +} + +func TestTargetBuilder(t *testing.T) { + target := NewTargetFromMap(map[string]string{"a": "5", "b": "10"}) + builder := NewTargetBuilderFrom(target) + + builder.Set("foo", "bar") + get, ok := builder.Target().Get("foo") + require.True(t, ok) + require.Equal(t, "bar", get) + + builder = NewTargetBuilderFrom(target) + builder.Del("foo") + get, ok = builder.Target().Get("foo") + require.False(t, ok) + require.Equal(t, "", get) + + // Test setting on empty target (verifies it won't panic) + NewTargetBuilder().Set("foo", "bar") +} + +func TestConvertFromNative(t *testing.T) { + var nativeTargets = []model.LabelSet{ + {model.LabelName("hip"): model.LabelValue("hop")}, + {model.LabelName("nae"): model.LabelValue("nae")}, + } + + nativeGroup := &targetgroup.Group{ + Targets: nativeTargets, + Labels: model.LabelSet{ + model.LabelName("boom"): model.LabelValue("bap"), + }, + Source: "test", + } + + expected := []Target{ + NewTargetFromMap(map[string]string{"hip": "hop", "boom": "bap"}), + NewTargetFromMap(map[string]string{"nae": "nae", "boom": "bap"}), + } + + require.True(t, equality.DeepEqual(expected, toAlloyTargets(map[string]*targetgroup.Group{"test": nativeGroup}))) +} + +func TestEquals_Basic(t *testing.T) { + // NOTE: if we start caching anything as a field, the equality may break. We should test it. + t1 := NewTargetFromMap(map[string]string{"hip": "hop", "boom": "bap"}) + require.Equal(t, 2, t1.Len()) + tb := NewTargetBuilderFrom(t1) + tb.Set("boom", "bap") + t2 := tb.Target() + // This is a way commonly used in tests. + require.Equal(t, t1, t2) + // This is the way exports are compared in BuiltinComponentNode.setExports, and it's important for performance that + // Targets equality is working correctly. + require.True(t, reflect.DeepEqual(t1, t2)) +} + +// TODO(thampiotr): will need a lot more tests like this and with a builder +func TestEquals_Custom(t *testing.T) { + t1 := NewTargetFromSpecificAndBaseLabelSet( + model.LabelSet{"foo": "bar"}, + model.LabelSet{"hip": "hop"}, + ) + t2 := NewTargetFromSpecificAndBaseLabelSet( + nil, + model.LabelSet{"hip": "hop", "foo": "bar"}, + ) + require.NotEqual(t, t1, t2) + require.True(t, t1.EqualsTarget(t2)) + require.True(t, t1.Equals(t2)) +} + +func Benchmark_Targets_TypicalPipeline(b *testing.B) { + sharedLabels := 5 + labelsPerTarget := 5 + labelsLength := 10 + targetsCount := 20_000 + numPeers := 10 + + genLabelSet := func(size int) model.LabelSet { + ls := model.LabelSet{} + for i := 0; i < size; i++ { + name, _ := goutils.RandomAlphaNumeric(labelsLength) + value, _ := goutils.RandomAlphaNumeric(labelsLength) + ls[model.LabelName(name)] = model.LabelValue(value) + } + return ls + } + + var labelSets []model.LabelSet + for i := 0; i < targetsCount; i++ { + labelSets = append(labelSets, genLabelSet(labelsPerTarget)) + } + + cache := map[string]*targetgroup.Group{} + cache["test"] = &targetgroup.Group{ + Targets: labelSets, + Labels: genLabelSet(sharedLabels), + Source: "test", + } + + peers := make([]peer.Peer, 0, numPeers) + for i := 0; i < numPeers; i++ { + peerName := fmt.Sprintf("peer_%d", i) + peers = append(peers, peer.Peer{Name: peerName, Addr: peerName, Self: i == 0, State: peer.StateParticipant}) + } + + cluster := &randomCluster{ + peers: peers, + peersByIndex: make(map[int][]peer.Peer, len(peers)), + } + + b.ResetTimer() + + var prevDistTargets *DistributedTargets + for i := 0; i < b.N; i++ { + // Creating the targets in discovery + targets := toAlloyTargets(cache) + + // Relabel of targets in discovery.relabel + for ind := range targets { + builder := NewTargetBuilderFrom(targets[ind]) + // would do alloy_relabel.ProcessBuilder here to relabel + targets[ind] = builder.Target() + } + + // discovery.scrape: distributing targets for clustering + dt := NewDistributedTargets(true, cluster, targets) + _ = dt.LocalTargets() + _ = dt.MovedToRemoteInstance(prevDistTargets) + // Sending LabelSet to Prometheus library for scraping + _ = ComponentTargetsToPromTargetGroups("test", targets) + + // Remote write happens on a sample level and largely outside Alloy's codebase, so skipping here. + + prevDistTargets = dt + } +} + +type randomCluster struct { + peers []peer.Peer + // stores results in a map to reduce the allocation noise in the benchmark + peersByIndex map[int][]peer.Peer +} + +func (f *randomCluster) Lookup(key shard.Key, _ int, _ shard.Op) ([]peer.Peer, error) { + ind := int(key) + if ind < 0 { + ind = -ind + } + peerIndex := ind % len(f.peers) + if _, ok := f.peersByIndex[peerIndex]; !ok { + f.peersByIndex[peerIndex] = []peer.Peer{f.peers[peerIndex]} + } + return f.peersByIndex[peerIndex], nil +} + +func (f *randomCluster) Peers() []peer.Peer { + return f.peers +} diff --git a/internal/component/local/file_match/file_test.go b/internal/component/local/file_match/file_test.go index 63645315bd..b63cdb9e41 100644 --- a/internal/component/local/file_match/file_test.go +++ b/internal/component/local/file_match/file_test.go @@ -4,6 +4,7 @@ package file_match import ( + "context" "os" "path" "strings" @@ -12,12 +13,11 @@ import ( "github.com/grafana/alloy/internal/component/discovery" - "context" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" "github.com/grafana/alloy/internal/component" "github.com/grafana/alloy/internal/util" - "github.com/prometheus/client_golang/prometheus" - "github.com/stretchr/testify/require" ) func TestFile(t *testing.T) { @@ -254,7 +254,9 @@ func TestMultiLabels(t *testing.T) { "foo": "bar", "fruit": "apple", }) - c.args.PathTargets[0]["newlabel"] = "test" + tb := discovery.NewTargetBuilderFrom(c.args.PathTargets[0]) + tb.Set("newlabel", "test") + c.args.PathTargets[0] = tb.Target() ct := context.Background() ct, ccl := context.WithTimeout(ct, 40*time.Second) defer ccl() @@ -278,14 +280,15 @@ func createComponent(t *testing.T, dir string, paths []string, excluded []string func createComponentWithLabels(t *testing.T, dir string, paths []string, excluded []string, labels map[string]string) *Component { tPaths := make([]discovery.Target, 0) for i, p := range paths { - tar := discovery.Target{"__path__": p} + tb := discovery.NewTargetBuilder() + tb.Set("__path__", p) for k, v := range labels { - tar[k] = v + tb.Set(k, v) } if i < len(excluded) { - tar["__path_exclude__"] = excluded[i] + tb.Set("__path_exclude__", excluded[i]) } - tPaths = append(tPaths, tar) + tPaths = append(tPaths, tb.Target()) } c, err := New(component.Options{ ID: "test", @@ -308,7 +311,7 @@ func createComponentWithLabels(t *testing.T, dir string, paths []string, exclude func contains(sources []discovery.Target, match string) bool { for _, s := range sources { - p := s["__path__"] + p, _ := s.Get("__path__") if strings.Contains(p, match) { return true } diff --git a/internal/component/local/file_match/watch.go b/internal/component/local/file_match/watch.go index 04d8e456c3..25682ca878 100644 --- a/internal/component/local/file_match/watch.go +++ b/internal/component/local/file_match/watch.go @@ -59,21 +59,20 @@ func (w *watch) getPaths() ([]discovery.Target, error) { continue } - dt := discovery.Target{} - for dk, v := range w.target { - dt[dk] = v - } - dt["__path__"] = abs - allMatchingPaths = append(allMatchingPaths, dt) + tb := discovery.NewTargetBuilderFrom(w.target) + tb.Set("__path__", abs) + allMatchingPaths = append(allMatchingPaths, tb.Target()) } return allMatchingPaths, nil } func (w *watch) getPath() string { - return w.target["__path__"] + path, _ := w.target.Get("__path__") + return path } func (w *watch) getExcludePath() string { - return w.target["__path_exclude__"] + excludePath, _ := w.target.Get("__path_exclude__") + return excludePath } diff --git a/internal/component/loki/process/process_test.go b/internal/component/loki/process/process_test.go index 7558b0fc6c..a8962d73f6 100644 --- a/internal/component/loki/process/process_test.go +++ b/internal/component/loki/process/process_test.go @@ -400,7 +400,7 @@ stage.static_labels { go func() { err := ctrl.Run(ctx, lsf.Arguments{ - Targets: []discovery.Target{{"__path__": f.Name(), "somelbl": "somevalue"}}, + Targets: []discovery.Target{discovery.NewTargetFromMap(map[string]string{"__path__": f.Name(), "somelbl": "somevalue"})}, ForwardTo: []loki.LogsReceiver{ tc1.Exports().(Exports).Receiver, tc2.Exports().(Exports).Receiver, diff --git a/internal/component/loki/relabel/relabel_test.go b/internal/component/loki/relabel/relabel_test.go index b0abe69967..ed810bc47b 100644 --- a/internal/component/loki/relabel/relabel_test.go +++ b/internal/component/loki/relabel/relabel_test.go @@ -346,7 +346,7 @@ rule { go func() { err := ctrl.Run(context.Background(), lsf.Arguments{ - Targets: []discovery.Target{{"__path__": f.Name(), "somelbl": "somevalue"}}, + Targets: []discovery.Target{discovery.NewTargetFromMap(map[string]string{"__path__": f.Name(), "somelbl": "somevalue"})}, ForwardTo: []loki.LogsReceiver{ tc1.Exports().(Exports).Receiver, tc2.Exports().(Exports).Receiver, diff --git a/internal/component/loki/source/docker/docker.go b/internal/component/loki/source/docker/docker.go index ffeb9d26e6..12e36cd42a 100644 --- a/internal/component/loki/source/docker/docker.go +++ b/internal/component/loki/source/docker/docker.go @@ -16,6 +16,10 @@ import ( "github.com/docker/docker/client" "github.com/go-kit/log" + "github.com/prometheus/common/config" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/relabel" + "github.com/grafana/alloy/internal/component" types "github.com/grafana/alloy/internal/component/common/config" "github.com/grafana/alloy/internal/component/common/loki" @@ -26,9 +30,6 @@ import ( "github.com/grafana/alloy/internal/featuregate" "github.com/grafana/alloy/internal/runtime/logging/level" "github.com/grafana/alloy/internal/useragent" - "github.com/prometheus/common/config" - "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/model/relabel" ) func init() { @@ -227,11 +228,8 @@ func (c *Component) Update(args component.Arguments) error { promTargets := make([]promTarget, len(newArgs.Targets)) for i, target := range newArgs.Targets { - var labels = make(model.LabelSet) - for k, v := range target { - labels[model.LabelName(k)] = model.LabelValue(v) - } - promTargets[i] = promTarget{labels: labels, fingerPrint: labels.Fingerprint()} + labelsCopy := target.LabelSet() + promTargets[i] = promTarget{labels: labelsCopy, fingerPrint: labelsCopy.Fingerprint()} } // Sorting the targets before filtering ensures consistent filtering of targets diff --git a/internal/component/loki/source/file/file.go b/internal/component/loki/source/file/file.go index dc1222e1fd..e283b44596 100644 --- a/internal/component/loki/source/file/file.go +++ b/internal/component/loki/source/file/file.go @@ -5,18 +5,18 @@ import ( "fmt" "os" "path/filepath" - "strings" "sync" "time" + "github.com/grafana/tail/watch" + "github.com/prometheus/common/model" + "github.com/grafana/alloy/internal/component" "github.com/grafana/alloy/internal/component/common/loki" "github.com/grafana/alloy/internal/component/common/loki/positions" "github.com/grafana/alloy/internal/component/discovery" "github.com/grafana/alloy/internal/featuregate" "github.com/grafana/alloy/internal/runtime/logging/level" - "github.com/grafana/tail/watch" - "github.com/prometheus/common/model" ) func init() { @@ -199,15 +199,9 @@ func (c *Component) Update(args component.Arguments) error { } for _, target := range newArgs.Targets { - path := target[pathLabel] + path, _ := target.Get(pathLabel) - labels := make(model.LabelSet) - for k, v := range target { - if strings.HasPrefix(k, model.ReservedLabelPrefix) { - continue - } - labels[model.LabelName(k)] = model.LabelValue(v) - } + labels := target.NonReservedLabelSet() // Deduplicate targets which have the same public label set. readersKey := positions.Entry{Path: path, Labels: labels.String()} diff --git a/internal/component/loki/source/file/file_test.go b/internal/component/loki/source/file/file_test.go index fcd418b6f1..9d9482dc9a 100644 --- a/internal/component/loki/source/file/file_test.go +++ b/internal/component/loki/source/file/file_test.go @@ -11,16 +11,17 @@ import ( "testing" "time" - "github.com/grafana/alloy/internal/component" - "github.com/grafana/alloy/internal/component/common/loki" - "github.com/grafana/alloy/internal/component/discovery" - "github.com/grafana/alloy/internal/runtime/componenttest" - "github.com/grafana/alloy/internal/util" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" "go.uber.org/goleak" "golang.org/x/text/encoding/unicode" + + "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/component/common/loki" + "github.com/grafana/alloy/internal/component/discovery" + "github.com/grafana/alloy/internal/runtime/componenttest" + "github.com/grafana/alloy/internal/util" ) func Test(t *testing.T) { @@ -41,10 +42,10 @@ func Test(t *testing.T) { go func() { err := ctrl.Run(ctx, Arguments{ - Targets: []discovery.Target{{ + Targets: []discovery.Target{discovery.NewTargetFromMap(map[string]string{ "__path__": f.Name(), "foo": "bar", - }}, + })}, ForwardTo: []loki.LogsReceiver{ch1, ch2}, }) require.NoError(t, err) @@ -91,10 +92,10 @@ func TestFileWatch(t *testing.T) { ch1 := loki.NewLogsReceiver() args := Arguments{ - Targets: []discovery.Target{{ + Targets: []discovery.Target{discovery.NewTargetFromMap(map[string]string{ "__path__": f.Name(), "foo": "bar", - }}, + })}, ForwardTo: []loki.LogsReceiver{ch1}, FileWatch: FileWatch{ MinPollFrequency: time.Millisecond * 500, @@ -150,10 +151,10 @@ func TestUpdate_NoLeak(t *testing.T) { require.NoError(t, err) args := Arguments{ - Targets: []discovery.Target{{ + Targets: []discovery.Target{discovery.NewTargetFromMap(map[string]string{ "__path__": f.Name(), "foo": "bar", - }}, + })}, ForwardTo: []loki.LogsReceiver{}, } @@ -195,8 +196,8 @@ func TestTwoTargets(t *testing.T) { ch1 := loki.NewLogsReceiver() args := Arguments{} args.Targets = []discovery.Target{ - {"__path__": f.Name(), "foo": "bar"}, - {"__path__": f2.Name(), "foo": "bar2"}, + discovery.NewTargetFromMap(map[string]string{"__path__": f.Name(), "foo": "bar"}), + discovery.NewTargetFromMap(map[string]string{"__path__": f2.Name(), "foo": "bar2"}), } args.ForwardTo = []loki.LogsReceiver{ch1} @@ -265,7 +266,7 @@ func TestEncoding(t *testing.T) { ch1 := loki.NewLogsReceiver() args := Arguments{} - args.Targets = []discovery.Target{{"__path__": f.Name(), "lbl1": "val1"}} + args.Targets = []discovery.Target{discovery.NewTargetFromMap(map[string]string{"__path__": f.Name(), "lbl1": "val1"})} args.Encoding = "UTF-16BE" args.ForwardTo = []loki.LogsReceiver{ch1} diff --git a/internal/component/loki/source/kubernetes/kubernetes_test.go b/internal/component/loki/source/kubernetes/kubernetes_test.go index dd16a4fa3d..703ee63527 100644 --- a/internal/component/loki/source/kubernetes/kubernetes_test.go +++ b/internal/component/loki/source/kubernetes/kubernetes_test.go @@ -1,13 +1,15 @@ package kubernetes import ( + "testing" + "github.com/grafana/alloy/internal/component/discovery" "github.com/grafana/alloy/internal/component/loki/source/kubernetes/kubetail" "github.com/grafana/alloy/internal/service/cluster" - "testing" - "github.com/grafana/alloy/syntax" "github.com/stretchr/testify/require" + + "github.com/grafana/alloy/syntax" ) func TestAlloyConfig(t *testing.T) { @@ -55,20 +57,20 @@ func TestClusteringDuplicateAddress(t *testing.T) { true, cluster.Mock(), []discovery.Target{ - { + discovery.NewTargetFromMap(map[string]string{ "__address__": "localhost:9090", "container": "alloy", "pod": "grafana-k8s-monitoring-alloy-0", "job": "integrations/alloy", "namespace": "default", - }, - { + }), + discovery.NewTargetFromMap(map[string]string{ "__address__": "localhost:8080", "container": "alloy", "pod": "grafana-k8s-monitoring-alloy-0", "job": "integrations/alloy", "namespace": "default", - }, + }), }, kubetail.ClusteringLabels, ) diff --git a/internal/component/loki/write/write_test.go b/internal/component/loki/write/write_test.go index 7ee1c95b21..7015878627 100644 --- a/internal/component/loki/write/write_test.go +++ b/internal/component/loki/write/write_test.go @@ -272,7 +272,7 @@ func testMultipleEndpoint(t *testing.T, alterArgs func(arguments *Arguments)) { go func() { err := ctrl.Run(context.Background(), lsf.Arguments{ - Targets: []discovery.Target{{"__path__": f.Name(), "somelbl": "somevalue"}}, + Targets: []discovery.Target{discovery.NewTargetFromMap(map[string]string{"__path__": f.Name(), "somelbl": "somevalue"})}, ForwardTo: []loki.LogsReceiver{ tc1.Exports().(Exports).Receiver, tc2.Exports().(Exports).Receiver, diff --git a/internal/component/prometheus/exporter/blackbox/blackbox.go b/internal/component/prometheus/exporter/blackbox/blackbox.go index 0803fd6746..7a40ff736b 100644 --- a/internal/component/prometheus/exporter/blackbox/blackbox.go +++ b/internal/component/prometheus/exporter/blackbox/blackbox.go @@ -45,14 +45,15 @@ func buildBlackboxTargets(baseTarget discovery.Target, args component.Arguments) } for _, tgt := range blackboxTargets { - target := make(discovery.Target) + target := make(map[string]string, len(tgt.Labels)+baseTarget.Len()) // Set extra labels first, meaning that any other labels will override for k, v := range tgt.Labels { target[k] = v } - for k, v := range baseTarget { - target[k] = v - } + baseTarget.ForEachLabel(func(key string, value string) bool { + target[key] = value + return true + }) target["job"] = target["job"] + "/" + tgt.Name target["__param_target"] = tgt.Target @@ -60,7 +61,7 @@ func buildBlackboxTargets(baseTarget discovery.Target, args component.Arguments) target["__param_module"] = tgt.Module } - targets = append(targets, target) + targets = append(targets, discovery.NewTargetFromMap(target)) } return targets diff --git a/internal/component/prometheus/exporter/blackbox/blackbox_test.go b/internal/component/prometheus/exporter/blackbox/blackbox_test.go index 59c6fd2d21..3ad36020de 100644 --- a/internal/component/prometheus/exporter/blackbox/blackbox_test.go +++ b/internal/component/prometheus/exporter/blackbox/blackbox_test.go @@ -4,13 +4,14 @@ import ( "testing" "time" - "github.com/grafana/alloy/internal/component" - "github.com/grafana/alloy/internal/component/discovery" - "github.com/grafana/alloy/syntax" blackbox_config "github.com/prometheus/blackbox_exporter/config" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" "gopkg.in/yaml.v2" + + "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/component/discovery" + "github.com/grafana/alloy/syntax" ) func TestUnmarshalAlloy(t *testing.T) { @@ -259,20 +260,20 @@ func TestBuildBlackboxTargets(t *testing.T) { Targets: TargetBlock{{Name: "target_a", Target: "http://example.com", Module: "http_2xx"}}, ProbeTimeoutOffset: 1.0, } - baseTarget := discovery.Target{ + baseTarget := discovery.NewTargetFromMap(map[string]string{ model.SchemeLabel: "http", model.MetricsPathLabel: "component/prometheus.exporter.blackbox.default/metrics", "instance": "prometheus.exporter.blackbox.default", "job": "integrations/blackbox", "__meta_agent_integration_name": "blackbox", "__meta_agent_integration_instance": "prometheus.exporter.blackbox.default", - } + }) args := component.Arguments(baseArgs) targets := buildBlackboxTargets(baseTarget, args) require.Equal(t, 1, len(targets)) - require.Equal(t, "integrations/blackbox/target_a", targets[0]["job"]) - require.Equal(t, "http://example.com", targets[0]["__param_target"]) - require.Equal(t, "http_2xx", targets[0]["__param_module"]) + requireTargetLabel(t, targets[0], "job", "integrations/blackbox/target_a") + requireTargetLabel(t, targets[0], "__param_target", "http://example.com") + requireTargetLabel(t, targets[0], "__param_module", "http_2xx") } func TestBuildBlackboxTargetsWithExtraLabels(t *testing.T) { @@ -289,23 +290,23 @@ func TestBuildBlackboxTargetsWithExtraLabels(t *testing.T) { }}, ProbeTimeoutOffset: 1.0, } - baseTarget := discovery.Target{ + baseTarget := discovery.NewTargetFromMap(map[string]string{ model.SchemeLabel: "http", model.MetricsPathLabel: "component/prometheus.exporter.blackbox.default/metrics", "instance": "prometheus.exporter.blackbox.default", "job": "integrations/blackbox", "__meta_agent_integration_name": "blackbox", "__meta_agent_integration_instance": "prometheus.exporter.blackbox.default", - } + }) args := component.Arguments(baseArgs) targets := buildBlackboxTargets(baseTarget, args) require.Equal(t, 1, len(targets)) - require.Equal(t, "integrations/blackbox/target_a", targets[0]["job"]) - require.Equal(t, "http://example.com", targets[0]["__param_target"]) - require.Equal(t, "http_2xx", targets[0]["__param_module"]) + requireTargetLabel(t, targets[0], "job", "integrations/blackbox/target_a") + requireTargetLabel(t, targets[0], "__param_target", "http://example.com") + requireTargetLabel(t, targets[0], "__param_module", "http_2xx") - require.Equal(t, "test", targets[0]["env"]) - require.Equal(t, "bar", targets[0]["foo"]) + requireTargetLabel(t, targets[0], "env", "test") + requireTargetLabel(t, targets[0], "foo", "bar") // Check that the extra labels do not override existing labels baseArgs.Targets[0].Labels = map[string]string{ @@ -315,8 +316,8 @@ func TestBuildBlackboxTargetsWithExtraLabels(t *testing.T) { args = component.Arguments(baseArgs) targets = buildBlackboxTargets(baseTarget, args) require.Equal(t, 1, len(targets)) - require.Equal(t, "integrations/blackbox/target_a", targets[0]["job"]) - require.Equal(t, "prometheus.exporter.blackbox.default", targets[0]["instance"]) + requireTargetLabel(t, targets[0], "job", "integrations/blackbox/target_a") + requireTargetLabel(t, targets[0], "instance", "prometheus.exporter.blackbox.default") } // Test convert from TargetsList to []blackbox_exporter.BlackboxTarget @@ -451,3 +452,10 @@ func TestValidateTargetsMutualExclusivity(t *testing.T) { } require.ErrorContains(t, args.Validate(), "the block `target` and the attribute `targets` are mutually exclusive") } + +func requireTargetLabel(t *testing.T, target discovery.Target, label, expectedValue string) { + t.Helper() + actual, ok := target.Get(label) + require.True(t, ok) + require.Equal(t, expectedValue, actual) +} diff --git a/internal/component/prometheus/exporter/exporter.go b/internal/component/prometheus/exporter/exporter.go index 466af3c3a1..9714ce3624 100644 --- a/internal/component/prometheus/exporter/exporter.go +++ b/internal/component/prometheus/exporter/exporter.go @@ -9,12 +9,13 @@ import ( "strings" "sync" + "github.com/prometheus/common/model" + "github.com/grafana/alloy/internal/component" "github.com/grafana/alloy/internal/component/discovery" "github.com/grafana/alloy/internal/runtime/logging/level" http_service "github.com/grafana/alloy/internal/service/http" "github.com/grafana/alloy/internal/static/integrations" - "github.com/prometheus/common/model" ) // Creator is a function provided by an implementation to create a concrete exporter instance. @@ -92,7 +93,9 @@ func (c *Component) Update(args component.Arguments) error { c.mut.Lock() c.exporter = exporter if instanceKey != "" { - c.baseTarget["instance"] = instanceKey + tb := discovery.NewTargetBuilderFrom(c.baseTarget) + tb.Set("instance", instanceKey) + c.baseTarget = tb.Target() } var targets []discovery.Target @@ -142,7 +145,7 @@ func newExporter(creator Creator, name string, targetBuilderFunc func(discovery. componentName = opts.ID } - c.baseTarget = discovery.Target{ + c.baseTarget = discovery.NewTargetFromMap(map[string]string{ model.AddressLabel: httpData.MemoryListenAddr, model.SchemeLabel: "http", model.MetricsPathLabel: path.Join(httpData.HTTPPathForComponent(opts.ID), "metrics"), @@ -150,8 +153,7 @@ func newExporter(creator Creator, name string, targetBuilderFunc func(discovery. "job": jobName, "__meta_component_name": componentName, "__meta_component_id": opts.ID, - } - + }) // Call to Update() to set the output once at the start. if err := c.Update(args); err != nil { return nil, err diff --git a/internal/component/prometheus/exporter/kafka/kafka.go b/internal/component/prometheus/exporter/kafka/kafka.go index 26e64a7421..6c64821f6a 100644 --- a/internal/component/prometheus/exporter/kafka/kafka.go +++ b/internal/component/prometheus/exporter/kafka/kafka.go @@ -4,6 +4,8 @@ import ( "fmt" "github.com/IBM/sarama" + "github.com/prometheus/common/config" + "github.com/grafana/alloy/internal/component" "github.com/grafana/alloy/internal/component/discovery" "github.com/grafana/alloy/internal/component/prometheus/exporter" @@ -11,7 +13,6 @@ import ( "github.com/grafana/alloy/internal/static/integrations" "github.com/grafana/alloy/internal/static/integrations/kafka_exporter" "github.com/grafana/alloy/syntax/alloytypes" - "github.com/prometheus/common/config" ) var DefaultArguments = Arguments{ @@ -92,13 +93,13 @@ func (a *Arguments) Validate() error { func customizeTarget(baseTarget discovery.Target, args component.Arguments) []discovery.Target { a := args.(Arguments) - target := baseTarget + targetBuilder := discovery.NewTargetBuilderFrom(baseTarget) if len(a.KafkaURIs) > 1 { - target["instance"] = a.Instance + targetBuilder.Set("instance", a.Instance) } else { - target["instance"] = a.KafkaURIs[0] + targetBuilder.Set("instance", a.KafkaURIs[0]) } - return []discovery.Target{target} + return []discovery.Target{targetBuilder.Target()} } func createExporter(opts component.Options, args component.Arguments, defaultInstanceKey string) (integrations.Integration, string, error) { diff --git a/internal/component/prometheus/exporter/kafka/kafka_test.go b/internal/component/prometheus/exporter/kafka/kafka_test.go index 58f165402f..d257ad6068 100644 --- a/internal/component/prometheus/exporter/kafka/kafka_test.go +++ b/internal/component/prometheus/exporter/kafka/kafka_test.go @@ -3,10 +3,11 @@ package kafka import ( "testing" + "github.com/stretchr/testify/require" + "github.com/grafana/alloy/internal/component/discovery" "github.com/grafana/alloy/internal/static/integrations/kafka_exporter" "github.com/grafana/alloy/syntax" - "github.com/stretchr/testify/require" ) func TestAlloyUnmarshal(t *testing.T) { @@ -116,7 +117,9 @@ func TestCustomizeTarget(t *testing.T) { baseTarget := discovery.Target{} newTargets := customizeTarget(baseTarget, args) require.Equal(t, 1, len(newTargets)) - require.Equal(t, "example", newTargets[0]["instance"]) + val, ok := newTargets[0].Get("instance") + require.True(t, ok) + require.Equal(t, "example", val) } func TestSASLPassword(t *testing.T) { // #6044 diff --git a/internal/component/prometheus/exporter/snmp/snmp.go b/internal/component/prometheus/exporter/snmp/snmp.go index 556c7229b3..5b0fd94c8a 100644 --- a/internal/component/prometheus/exporter/snmp/snmp.go +++ b/internal/component/prometheus/exporter/snmp/snmp.go @@ -6,6 +6,9 @@ import ( "slices" "time" + snmp_config "github.com/prometheus/snmp_exporter/config" + "gopkg.in/yaml.v2" + "github.com/grafana/alloy/internal/component" "github.com/grafana/alloy/internal/component/discovery" "github.com/grafana/alloy/internal/component/prometheus/exporter" @@ -13,8 +16,6 @@ import ( "github.com/grafana/alloy/internal/static/integrations" "github.com/grafana/alloy/internal/static/integrations/snmp_exporter" "github.com/grafana/alloy/syntax/alloytypes" - snmp_config "github.com/prometheus/snmp_exporter/config" - "gopkg.in/yaml.v2" ) func init() { @@ -35,6 +36,7 @@ func createExporter(opts component.Options, args component.Arguments, defaultIns // buildSNMPTargets creates the exporter's discovery targets based on the defined SNMP targets. func buildSNMPTargets(baseTarget discovery.Target, args component.Arguments) []discovery.Target { + // TODO: This implementation of targets manipulation may not be optimal. If it's a hot spot, we should optimise it. var targets []discovery.Target snmpTargets := args.(Arguments).Targets @@ -44,14 +46,15 @@ func buildSNMPTargets(baseTarget discovery.Target, args component.Arguments) []d } for _, tgt := range snmpTargets { - target := make(discovery.Target) + target := make(map[string]string, len(tgt.Labels)+baseTarget.Len()) // Set extra labels first, meaning that any other labels will override for k, v := range tgt.Labels { target[k] = v } - for k, v := range baseTarget { - target[k] = v - } + baseTarget.ForEachLabel(func(key string, value string) bool { + target[key] = value + return true + }) target["job"] = target["job"] + "/" + tgt.Name target["__param_target"] = tgt.Target @@ -69,7 +72,7 @@ func buildSNMPTargets(baseTarget discovery.Target, args component.Arguments) []d target["__param_auth"] = tgt.Auth } - targets = append(targets, target) + targets = append(targets, discovery.NewTargetFromMap(target)) } return targets diff --git a/internal/component/prometheus/exporter/snmp/snmp_test.go b/internal/component/prometheus/exporter/snmp/snmp_test.go index 677d27e813..0e80bc3f90 100644 --- a/internal/component/prometheus/exporter/snmp/snmp_test.go +++ b/internal/component/prometheus/exporter/snmp/snmp_test.go @@ -341,22 +341,22 @@ func TestBuildSNMPTargets(t *testing.T) { WalkParams: "public", Auth: "public_v2"}}, WalkParams: WalkParams{{Name: "public", Retries: 2}}, } - baseTarget := discovery.Target{ + baseTarget := discovery.NewTargetFromMap(map[string]string{ model.SchemeLabel: "http", model.MetricsPathLabel: "component/prometheus.exporter.snmp.default/metrics", "instance": "prometheus.exporter.snmp.default", "job": "integrations/snmp", "__meta_agent_integration_name": "snmp", "__meta_agent_integration_instance": "prometheus.exporter.snmp.default", - } + }) args := component.Arguments(baseArgs) targets := buildSNMPTargets(baseTarget, args) require.Equal(t, 1, len(targets)) - require.Equal(t, "integrations/snmp/network_switch_1", targets[0]["job"]) - require.Equal(t, "192.168.1.2", targets[0]["__param_target"]) - require.Equal(t, "if_mib", targets[0]["__param_module"]) - require.Equal(t, "public", targets[0]["__param_walk_params"]) - require.Equal(t, "public_v2", targets[0]["__param_auth"]) + requireTargetLabel(t, targets[0], "job", "integrations/snmp/network_switch_1") + requireTargetLabel(t, targets[0], "__param_target", "192.168.1.2") + requireTargetLabel(t, targets[0], "__param_module", "if_mib") + requireTargetLabel(t, targets[0], "__param_walk_params", "public") + requireTargetLabel(t, targets[0], "__param_auth", "public_v2") } func TestUnmarshalAlloyWithInlineConfig(t *testing.T) { @@ -462,3 +462,10 @@ func TestUnmarshalAlloyWithInvalidInlineConfig(t *testing.T) { }) } } + +func requireTargetLabel(t *testing.T, target discovery.Target, label, expectedValue string) { + t.Helper() + actual, ok := target.Get(label) + require.True(t, ok) + require.Equal(t, expectedValue, actual) +} diff --git a/internal/component/prometheus/scrape/scrape.go b/internal/component/prometheus/scrape/scrape.go index ce7241c502..b95a52cc08 100644 --- a/internal/component/prometheus/scrape/scrape.go +++ b/internal/component/prometheus/scrape/scrape.go @@ -332,7 +332,7 @@ func (c *Component) distributeTargets( newLocalTargets := newDistTargets.LocalTargets() c.targetsGauge.Set(float64(len(newLocalTargets))) - promNewTargets := c.componentTargetsToPromTargetGroups(jobName, newLocalTargets) + promNewTargets := discovery.ComponentTargetsToPromTargetGroups(jobName, newLocalTargets) movedTargets := newDistTargets.MovedToRemoteInstance(oldDistributedTargets) c.movedTargetsCounter.Add(float64(len(movedTargets))) @@ -479,34 +479,27 @@ func (c *Component) DebugInfo() interface{} { } } -func (c *Component) componentTargetsToPromTargetGroups(jobName string, tgs []discovery.Target) map[string][]*targetgroup.Group { - promGroup := &targetgroup.Group{Source: jobName} - for _, tg := range tgs { - promGroup.Targets = append(promGroup.Targets, convertLabelSet(tg)) - } - - return map[string][]*targetgroup.Group{jobName: {promGroup}} -} - func (c *Component) populatePromLabels(targets []discovery.Target, jobName string, args Arguments) []*scrape.Target { + // We need to call scrape.TargetsFromGroup to reuse the rather complex logic of populating labels on targets. + allTargets := make([]*scrape.Target, 0, len(targets)) lb := labels.NewBuilder(labels.EmptyLabels()) - promTargets, errs := scrape.TargetsFromGroup( - c.componentTargetsToPromTargetGroups(jobName, targets)[jobName][0], - getPromScrapeConfigs(c.opts.ID, args), - false, /* noDefaultScrapePort - always false in this component */ - make([]*scrape.Target, len(targets)), /* targets slice to reuse */ - lb, - ) - for _, err := range errs { - level.Warn(c.opts.Logger).Log("msg", "error while populating labels of targets using prom config", "err", err) + groups := discovery.ComponentTargetsToPromTargetGroups(jobName, targets) + for _, tgs := range groups { + for _, tg := range tgs { + promTargets, errs := scrape.TargetsFromGroup( + tg, + getPromScrapeConfigs(jobName, args), + false, /* noDefaultScrapePort - always false in this component */ + make([]*scrape.Target, len(targets)), /* targets slice to reuse */ + lb, + ) + lb.Reset(labels.EmptyLabels()) + for _, err := range errs { + level.Warn(c.opts.Logger).Log("msg", "error while populating labels of targets using prom config", "err", err) + } + allTargets = append(allTargets, promTargets...) + } } - return promTargets -} -func convertLabelSet(tg discovery.Target) model.LabelSet { - lset := make(model.LabelSet, len(tg)) - for k, v := range tg { - lset[model.LabelName(k)] = model.LabelValue(v) - } - return lset + return allTargets } diff --git a/internal/component/prometheus/scrape/scrape_clustering_test.go b/internal/component/prometheus/scrape/scrape_clustering_test.go index e735a9f816..d51c393c9b 100644 --- a/internal/component/prometheus/scrape/scrape_clustering_test.go +++ b/internal/component/prometheus/scrape/scrape_clustering_test.go @@ -38,7 +38,7 @@ var ( // There is a race condition in prometheus where calls to NewManager can race over a package-global variable when // calling targetMetadataCache.registerManager(m). This is a workaround to prevent this for now. - //TODO(thampiotr): Open an issue in prometheus to fix this? + // TODO(thampiotr): Open an issue in prometheus to fix this? promManagerMutex sync.Mutex ) @@ -260,7 +260,7 @@ func setUpClusterLookup(fakeCluster *fakeCluster, assignment map[peer.Peer][]int fakeCluster.lookupMap = make(map[shard.Key][]peer.Peer) for owningPeer, ownedTargets := range assignment { for _, id := range ownedTargets { - fakeCluster.lookupMap[shard.Key(targets[id].Target().NonMetaLabels().Hash())] = []peer.Peer{owningPeer} + fakeCluster.lookupMap[shard.Key(targets[id].Target().NonMetaLabelsHash())] = []peer.Peer{owningPeer} } } } diff --git a/internal/component/pyroscope/ebpf/ebpf_linux.go b/internal/component/pyroscope/ebpf/ebpf_linux.go index b0f8a49571..9fef4b303c 100644 --- a/internal/component/pyroscope/ebpf/ebpf_linux.go +++ b/internal/component/pyroscope/ebpf/ebpf_linux.go @@ -10,16 +10,17 @@ import ( "sync" "time" - "github.com/grafana/alloy/internal/component" - "github.com/grafana/alloy/internal/component/pyroscope" - "github.com/grafana/alloy/internal/featuregate" - "github.com/grafana/alloy/internal/runtime/logging/level" ebpfspy "github.com/grafana/pyroscope/ebpf" demangle2 "github.com/grafana/pyroscope/ebpf/cpp/demangle" "github.com/grafana/pyroscope/ebpf/pprof" "github.com/grafana/pyroscope/ebpf/sd" "github.com/grafana/pyroscope/ebpf/symtab" "github.com/oklog/run" + + "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/component/pyroscope" + "github.com/grafana/alloy/internal/featuregate" + "github.com/grafana/alloy/internal/runtime/logging/level" ) func init() { @@ -221,7 +222,7 @@ func (c *Component) updateDebugInfo() { func targetsOptionFromArgs(args Arguments) sd.TargetsOptions { targets := make([]sd.DiscoveryTarget, 0, len(args.Targets)) for _, t := range args.Targets { - targets = append(targets, sd.DiscoveryTarget(t)) + targets = append(targets, t.AsMap()) } return sd.TargetsOptions{ Targets: targets, diff --git a/internal/component/pyroscope/ebpf/ebpf_linux_test.go b/internal/component/pyroscope/ebpf/ebpf_linux_test.go index 394db0a67d..c59fc901d5 100644 --- a/internal/component/pyroscope/ebpf/ebpf_linux_test.go +++ b/internal/component/pyroscope/ebpf/ebpf_linux_test.go @@ -198,10 +198,10 @@ forward_to = [] expected: func() Arguments { x := NewDefaultArguments() x.Targets = []discovery.Target{ - map[string]string{ + discovery.NewTargetFromMap(map[string]string{ "container_id": "cid", "service_name": "foo", - }, + }), } x.ForwardTo = []pyroscope.Appendable{} return x @@ -224,10 +224,10 @@ collect_kernel_profile = false`, expected: func() Arguments { x := NewDefaultArguments() x.Targets = []discovery.Target{ - map[string]string{ + discovery.NewTargetFromMap(map[string]string{ "container_id": "cid", "service_name": "foo", - }, + }), } x.ForwardTo = []pyroscope.Appendable{} x.CollectInterval = time.Second * 3 diff --git a/internal/component/pyroscope/java/java.go b/internal/component/pyroscope/java/java.go index 129bafe411..c7e76f7112 100644 --- a/internal/component/pyroscope/java/java.go +++ b/internal/component/pyroscope/java/java.go @@ -126,9 +126,14 @@ func (j *javaComponent) updateTargets(args Arguments) { active := make(map[int]struct{}) for _, target := range args.Targets { - pid64, err := strconv.ParseInt(target[labelProcessID], 10, 32) + pidStr, ok := target.Get(labelProcessID) + if !ok { + _ = level.Error(j.opts.Logger).Log("msg", "could not find PID label", "pid", pidStr) + continue + } + pid64, err := strconv.ParseInt(pidStr, 10, 32) if err != nil { - _ = level.Error(j.opts.Logger).Log("msg", "could not convert process ID to a 32 bit integer", "pid", target[labelProcessID], "err", err) + _ = level.Error(j.opts.Logger).Log("msg", "could not convert process ID to a 32 bit integer", "pid", pidStr, "err", err) continue } pid := int(pid64) diff --git a/internal/component/pyroscope/java/loop.go b/internal/component/pyroscope/java/loop.go index 03664e4939..ff6acf9297 100644 --- a/internal/component/pyroscope/java/loop.go +++ b/internal/component/pyroscope/java/loop.go @@ -13,14 +13,15 @@ import ( "time" "github.com/go-kit/log" - "github.com/grafana/alloy/internal/component/discovery" - "github.com/grafana/alloy/internal/component/pyroscope" - "github.com/grafana/alloy/internal/component/pyroscope/java/asprof" - "github.com/grafana/alloy/internal/runtime/logging/level" jfrpprof "github.com/grafana/jfr-parser/pprof" jfrpprofPyroscope "github.com/grafana/jfr-parser/pprof/pyroscope" "github.com/prometheus/prometheus/model/labels" gopsutil "github.com/shirou/gopsutil/v3/process" + + "github.com/grafana/alloy/internal/component/discovery" + "github.com/grafana/alloy/internal/component/pyroscope" + "github.com/grafana/alloy/internal/component/pyroscope/java/asprof" + "github.com/grafana/alloy/internal/runtime/logging/level" ) const spyName = "alloy.java" @@ -155,7 +156,7 @@ func (p *profilingLoop) push(jfrBytes []byte, startTime time.Time, endTime time. sz := req.Profile.SizeVT() l := log.With(p.logger, "metric", metric, "sz", sz) ls := labels.NewBuilder(nil) - for _, l := range jfrpprofPyroscope.Labels(target, profiles.JFREvent, req.Metric, "", spyName) { + for _, l := range jfrpprofPyroscope.Labels(target.AsMap(), profiles.JFREvent, req.Metric, "", spyName) { ls.Set(l.Name, l.Value) } if ls.Get(labelServiceName) == "" { diff --git a/internal/component/pyroscope/java/target.go b/internal/component/pyroscope/java/target.go index cffdf967b5..3cda7ea0ef 100644 --- a/internal/component/pyroscope/java/target.go +++ b/internal/component/pyroscope/java/target.go @@ -12,23 +12,22 @@ const ( ) func inferServiceName(target discovery.Target) string { - k8sServiceName := target[labelServiceNameK8s] - if k8sServiceName != "" { + if k8sServiceName, ok := target.Get(labelServiceNameK8s); ok { return k8sServiceName } - k8sNamespace := target["__meta_kubernetes_namespace"] - k8sContainer := target["__meta_kubernetes_pod_container_name"] - if k8sNamespace != "" && k8sContainer != "" { + k8sNamespace, nsOk := target.Get("__meta_kubernetes_namespace") + k8sContainer, contOk := target.Get("__meta_kubernetes_pod_container_name") + if nsOk && contOk { return fmt.Sprintf("java/%s/%s", k8sNamespace, k8sContainer) } - dockerContainer := target["__meta_docker_container_name"] - if dockerContainer != "" { + + if dockerContainer, ok := target.Get("__meta_docker_container_name"); ok { return dockerContainer } - if swarmService := target["__meta_dockerswarm_container_label_service_name"]; swarmService != "" { + if swarmService, ok := target.Get("__meta_dockerswarm_container_label_service_name"); ok { return swarmService } - if swarmService := target["__meta_dockerswarm_service_name"]; swarmService != "" { + if swarmService, ok := target.Get("__meta_dockerswarm_service_name"); ok { return swarmService } return "unspecified" diff --git a/internal/component/pyroscope/scrape/scrape.go b/internal/component/pyroscope/scrape/scrape.go index 301ac86053..9dab9333df 100644 --- a/internal/component/pyroscope/scrape/scrape.go +++ b/internal/component/pyroscope/scrape/scrape.go @@ -8,7 +8,6 @@ import ( "time" config_util "github.com/prometheus/common/config" - "github.com/prometheus/common/model" "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/grafana/alloy/internal/component/pyroscope" @@ -322,7 +321,7 @@ func (c *Component) Run(ctx context.Context) error { // NOTE(@tpaschalis) First approach, manually building the // 'clustered' targets implementation every time. ct := discovery.NewDistributedTargets(clusteringEnabled, c.cluster, tgs) - promTargets := c.componentTargetsToProm(jobName, ct.LocalTargets()) + promTargets := discovery.ComponentTargetsToPromTargetGroups(jobName, ct.LocalTargets()) select { case targetSetsChan <- promTargets: @@ -374,23 +373,6 @@ func (c *Component) NotifyClusterChange() { } } -func (c *Component) componentTargetsToProm(jobName string, tgs []discovery.Target) map[string][]*targetgroup.Group { - promGroup := &targetgroup.Group{Source: jobName} - for _, tg := range tgs { - promGroup.Targets = append(promGroup.Targets, convertLabelSet(tg)) - } - - return map[string][]*targetgroup.Group{jobName: {promGroup}} -} - -func convertLabelSet(tg discovery.Target) model.LabelSet { - lset := make(model.LabelSet, len(tg)) - for k, v := range tg { - lset[model.LabelName(k)] = model.LabelValue(v) - } - return lset -} - // DebugInfo implements component.DebugComponent. func (c *Component) DebugInfo() interface{} { var res []scrape.TargetStatus diff --git a/internal/component/pyroscope/scrape/scrape_loop_test.go b/internal/component/pyroscope/scrape/scrape_loop_test.go index 3fdf7969aa..5c4788cb86 100644 --- a/internal/component/pyroscope/scrape/scrape_loop_test.go +++ b/internal/component/pyroscope/scrape/scrape_loop_test.go @@ -11,9 +11,6 @@ import ( "time" "github.com/go-kit/log" - "github.com/grafana/alloy/internal/component/discovery" - "github.com/grafana/alloy/internal/component/pyroscope" - "github.com/grafana/alloy/internal/util" config_util "github.com/prometheus/common/config" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/discovery/targetgroup" @@ -21,6 +18,10 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/atomic" "go.uber.org/goleak" + + "github.com/grafana/alloy/internal/component/discovery" + "github.com/grafana/alloy/internal/component/pyroscope" + "github.com/grafana/alloy/internal/util" ) func TestScrapePool(t *testing.T) { @@ -28,7 +29,7 @@ func TestScrapePool(t *testing.T) { args := NewDefaultArguments() args.Targets = []discovery.Target{ - {"instance": "foo"}, + discovery.NewTargetFromMap(map[string]string{"instance": "foo"}), } args.ProfilingConfig.Block.Enabled = false args.ProfilingConfig.Goroutine.Enabled = false diff --git a/internal/component/pyroscope/scrape/scrape_test.go b/internal/component/pyroscope/scrape/scrape_test.go index 31d960da02..052526426d 100644 --- a/internal/component/pyroscope/scrape/scrape_test.go +++ b/internal/component/pyroscope/scrape/scrape_test.go @@ -10,6 +10,12 @@ import ( "testing" "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + "go.uber.org/atomic" + "go.uber.org/goleak" + "github.com/grafana/alloy/internal/component" "github.com/grafana/alloy/internal/component/discovery" "github.com/grafana/alloy/internal/component/prometheus/scrape" @@ -18,11 +24,6 @@ import ( http_service "github.com/grafana/alloy/internal/service/http" "github.com/grafana/alloy/internal/util" "github.com/grafana/alloy/syntax" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/common/model" - "github.com/stretchr/testify/require" - "go.uber.org/atomic" - "go.uber.org/goleak" ) func TestComponent(t *testing.T) { @@ -50,14 +51,14 @@ func TestComponent(t *testing.T) { arg.ForwardTo = []pyroscope.Appendable{pyroscope.NoopAppendable} arg.Targets = []discovery.Target{ - { + discovery.NewTargetFromMap(map[string]string{ model.AddressLabel: "foo", serviceNameLabel: "s", - }, - { + }), + discovery.NewTargetFromMap(map[string]string{ model.AddressLabel: "bar", serviceNameK8SLabel: "k", - }, + }), } c.Update(arg) @@ -99,10 +100,10 @@ func TestUnmarshalConfig(t *testing.T) { expected: func() Arguments { r := NewDefaultArguments() r.Targets = []discovery.Target{ - { + discovery.NewTargetFromMap(map[string]string{ "__address__": "localhost:9090", "foo": "bar", - }, + }), } return r }, @@ -131,14 +132,14 @@ func TestUnmarshalConfig(t *testing.T) { expected: func() Arguments { r := NewDefaultArguments() r.Targets = []discovery.Target{ - { + discovery.NewTargetFromMap(map[string]string{ "__address__": "localhost:9090", "foo": "bar", - }, - { + }), + discovery.NewTargetFromMap(map[string]string{ "__address__": "localhost:8080", "foo": "buzz", - }, + }), } r.ProfilingConfig.Block.Enabled = false r.ProfilingConfig.Custom = append(r.ProfilingConfig.Custom, CustomProfilingTarget{ @@ -266,16 +267,16 @@ func TestUpdateWhileScraping(t *testing.T) { go c.Run(ctx) args.Targets = []discovery.Target{ - { + discovery.NewTargetFromMap(map[string]string{ model.AddressLabel: address, serviceNameLabel: "s", "foo": "bar", - }, - { + }), + discovery.NewTargetFromMap(map[string]string{ model.AddressLabel: address, serviceNameK8SLabel: "k", "foo": "buz", - }, + }), } c.Update(args) @@ -290,11 +291,11 @@ func TestUpdateWhileScraping(t *testing.T) { go func() { for i := 0; i < 100; i++ { args.Targets = []discovery.Target{ - { + discovery.NewTargetFromMap(map[string]string{ model.AddressLabel: address, serviceNameLabel: "s", "foo": fmt.Sprintf("%d", i), - }, + }), } require.NoError(t, c.Update(args)) c.scraper.reload() diff --git a/internal/converter/internal/common/convert_targets.go b/internal/converter/internal/common/convert_targets.go index a6a79690c7..5b49c12d60 100644 --- a/internal/converter/internal/common/convert_targets.go +++ b/internal/converter/internal/common/convert_targets.go @@ -22,7 +22,7 @@ func NewDiscoveryExports(expr string) discovery.Exports { // as a component export string rather than the standard [discovery.Target] // AlloyTokenize. func NewDiscoveryTargets(expr string) []discovery.Target { - return []discovery.Target{map[string]string{"__expr__": expr}} + return []discovery.Target{discovery.NewTargetFromMap(map[string]string{"__expr__": expr})} } // ConvertTargets implements [builder.Tokenizer]. This allows us to set @@ -52,9 +52,9 @@ func (f ConvertTargets) AlloyTokenize() []builder.Token { toks = append(toks, builder.Token{Tok: token.LITERAL, Lit: "\n"}) } - for ix, targetMap := range f.Targets { + for ix, target := range f.Targets { keyValMap := map[string]string{} - for key, val := range targetMap { + target.ForEachLabel(func(key string, val string) bool { // __expr__ is a special key used by the converter code to specify // we should tokenize the value instead of tokenizing the map normally. // An alternative strategy would have been to add a new property for @@ -68,7 +68,8 @@ func (f ConvertTargets) AlloyTokenize() []builder.Token { } else { keyValMap[key] = val } - } + return true + }) if len(keyValMap) > 0 { expr.SetValue([]map[string]string{keyValMap}) diff --git a/internal/converter/internal/common/convert_targets_test.go b/internal/converter/internal/common/convert_targets_test.go index 2135fe20f7..1d33ef8eda 100644 --- a/internal/converter/internal/common/convert_targets_test.go +++ b/internal/converter/internal/common/convert_targets_test.go @@ -3,10 +3,11 @@ package common_test import ( "testing" + "github.com/stretchr/testify/require" + "github.com/grafana/alloy/internal/component/discovery" "github.com/grafana/alloy/internal/converter/internal/common" "github.com/grafana/alloy/syntax/token/builder" - "github.com/stretchr/testify/require" ) func TestOptionalSecret_Write(t *testing.T) { @@ -25,14 +26,14 @@ func TestOptionalSecret_Write(t *testing.T) { { name: "empty", value: common.ConvertTargets{ - Targets: []discovery.Target{{}}, + Targets: []discovery.Target{}, }, - expect: ``, + expect: `[]`, }, { name: "__address__ key", value: common.ConvertTargets{ - Targets: []discovery.Target{{"__address__": "testing"}}, + Targets: []discovery.Target{discovery.NewTargetFromMap(map[string]string{"__address__": "testing"})}, }, expect: `[{ __address__ = "testing", @@ -41,7 +42,7 @@ func TestOptionalSecret_Write(t *testing.T) { { name: "__address__ key label", value: common.ConvertTargets{ - Targets: []discovery.Target{{"__address__": "testing", "label": "value"}}, + Targets: []discovery.Target{discovery.NewTargetFromMap(map[string]string{"__address__": "testing", "label": "value"})}, }, expect: `[{ __address__ = "testing", @@ -52,8 +53,8 @@ func TestOptionalSecret_Write(t *testing.T) { name: "multiple __address__ key label", value: common.ConvertTargets{ Targets: []discovery.Target{ - {"__address__": "testing", "label": "value"}, - {"__address__": "testing2", "label": "value"}, + discovery.NewTargetFromMap(map[string]string{"__address__": "testing", "label": "value"}), + discovery.NewTargetFromMap(map[string]string{"__address__": "testing2", "label": "value"}), }, }, expect: `array.concat( @@ -70,14 +71,17 @@ func TestOptionalSecret_Write(t *testing.T) { { name: "__expr__ key", value: common.ConvertTargets{ - Targets: []discovery.Target{{"__expr__": "testing"}}, + Targets: []discovery.Target{discovery.NewTargetFromMap(map[string]string{"__expr__": "testing"})}, }, expect: `testing`, }, { name: "multiple __expr__ key", value: common.ConvertTargets{ - Targets: []discovery.Target{{"__expr__": "testing"}, {"__expr__": "testing2"}}, + Targets: []discovery.Target{ + discovery.NewTargetFromMap(map[string]string{"__expr__": "testing"}), + discovery.NewTargetFromMap(map[string]string{"__expr__": "testing2"}), + }, }, expect: `array.concat( testing, @@ -87,7 +91,10 @@ func TestOptionalSecret_Write(t *testing.T) { { name: "both key types", value: common.ConvertTargets{ - Targets: []discovery.Target{{"__address__": "testing", "label": "value"}, {"__expr__": "testing2"}}, + Targets: []discovery.Target{ + discovery.NewTargetFromMap(map[string]string{"__address__": "testing", "label": "value"}), + discovery.NewTargetFromMap(map[string]string{"__expr__": "testing2"}), + }, }, expect: `array.concat( [{ diff --git a/internal/converter/internal/prometheusconvert/component/scrape.go b/internal/converter/internal/prometheusconvert/component/scrape.go index 1291116ae8..c55faf0881 100644 --- a/internal/converter/internal/prometheusconvert/component/scrape.go +++ b/internal/converter/internal/prometheusconvert/component/scrape.go @@ -89,7 +89,7 @@ func getScrapeTargets(staticConfig prom_discovery.StaticConfig) []discovery.Targ targetMap[string(labelName)] = string(labelValue) newMap := map[string]string{} maps.Copy(newMap, targetMap) - targets = append(targets, newMap) + targets = append(targets, discovery.NewTargetFromMap(newMap)) } } } diff --git a/internal/converter/internal/promtailconvert/internal/build/service_discovery.go b/internal/converter/internal/promtailconvert/internal/build/service_discovery.go index c55587c6e3..2f20c9f1a3 100644 --- a/internal/converter/internal/promtailconvert/internal/build/service_discovery.go +++ b/internal/converter/internal/promtailconvert/internal/build/service_discovery.go @@ -26,7 +26,7 @@ func (s *ScrapeConfigBuilder) AppendSDs() { targetLiterals := make([]discovery.Target, 0) for _, target := range targets { - if expr, ok := target["__expr__"]; ok { + if expr, ok := target.Get("__expr__"); ok { // use the expression if __expr__ is set s.allTargetsExps = append(s.allTargetsExps, expr) } else { diff --git a/internal/converter/internal/test_common/testing.go b/internal/converter/internal/test_common/testing.go index a315f0362c..7bc06364dd 100644 --- a/internal/converter/internal/test_common/testing.go +++ b/internal/converter/internal/test_common/testing.go @@ -12,6 +12,9 @@ import ( "strings" "testing" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + "github.com/grafana/alloy/internal/converter/diag" "github.com/grafana/alloy/internal/featuregate" alloy_runtime "github.com/grafana/alloy/internal/runtime" @@ -20,9 +23,8 @@ import ( cluster_service "github.com/grafana/alloy/internal/service/cluster" http_service "github.com/grafana/alloy/internal/service/http" "github.com/grafana/alloy/internal/service/labelstore" + "github.com/grafana/alloy/internal/service/livedebugging" remotecfg_service "github.com/grafana/alloy/internal/service/remotecfg" - "github.com/prometheus/client_golang/prometheus" - "github.com/stretchr/testify/require" ) const ( @@ -214,6 +216,7 @@ func attemptLoadingAlloyConfig(t *testing.T, bb []byte) { clusterService, labelstore.New(nil, prometheus.DefaultRegisterer), remotecfgService, + livedebugging.New(), }, EnableCommunityComps: true, }) diff --git a/internal/runtime/componenttest/componenttest.go b/internal/runtime/componenttest/componenttest.go index dc5c1b9e2d..4526ba00c9 100644 --- a/internal/runtime/componenttest/componenttest.go +++ b/internal/runtime/componenttest/componenttest.go @@ -5,19 +5,21 @@ import ( "context" "fmt" "os" - "reflect" "sync" "time" - "github.com/grafana/alloy/internal/service/labelstore" - "github.com/grafana/alloy/internal/service/livedebugging" "github.com/prometheus/client_golang/prometheus" "go.uber.org/atomic" + "github.com/grafana/alloy/internal/runtime/equality" + "github.com/grafana/alloy/internal/service/labelstore" + "github.com/grafana/alloy/internal/service/livedebugging" + "github.com/go-kit/log" + "go.opentelemetry.io/otel/trace/noop" + "github.com/grafana/alloy/internal/component" "github.com/grafana/alloy/internal/runtime/logging" - "go.opentelemetry.io/otel/trace/noop" ) // A Controller is a testing controller which controls a single component. @@ -66,7 +68,7 @@ func NewControllerFromReg(l log.Logger, reg component.Registration) *Controller func (c *Controller) onStateChange(e component.Exports) { c.exportsMut.Lock() - changed := !reflect.DeepEqual(c.exports, e) + changed := !equality.DeepEqual(c.exports, e) c.exports = e c.exportsMut.Unlock() diff --git a/internal/runtime/equality/deep_equal.go b/internal/runtime/equality/deep_equal.go new file mode 100644 index 0000000000..bb8ee4429b --- /dev/null +++ b/internal/runtime/equality/deep_equal.go @@ -0,0 +1,2 @@ +package equality + diff --git a/internal/runtime/equality/equality.go b/internal/runtime/equality/equality.go new file mode 100644 index 0000000000..e2cd4204bb --- /dev/null +++ b/internal/runtime/equality/equality.go @@ -0,0 +1,123 @@ +package equality + +import ( + "reflect" +) + +var customEqualityType = reflect.TypeOf((*CustomEquality)(nil)).Elem() + +// CustomEquality allows to define custom Equals implementation. This can be used, for example, with exported types, +// so that the Runtime can short-circuit propagating updates when it is not necessary. +type CustomEquality interface { + Equals(other any) bool +} + +// DeepEqual is a wrapper around reflect.DeepEqual, which first checks if arguments implement CustomEquality. If they +// do, their Equals method is used for comparison instead of reflect.DeepEqual. +// For simplicity, DeepEqual requires x and y to be of the same type before calling CustomEquality.Equals. +// TODO(thampiotr): this will need a lot of tests!!! +func DeepEqual(x, y any) bool { + if x == nil || y == nil { + return x == y + } + v1 := reflect.ValueOf(x) + v2 := reflect.ValueOf(y) + + // See if we can compare them using CustomEquality + if r := deepCustomEqual(v1, v2); r.compared { + return r.isEqual + } + // Otherwise fall back to reflect.DeepEqual + return reflect.DeepEqual(x, y) +} + +type result struct { + compared bool + isEqual bool +} + +func successfulCompare(isEqual bool) result { return result{compared: true, isEqual: isEqual} } + +var ( + couldNotCompare = result{compared: false, isEqual: false} + comparedAndEqual = result{compared: true, isEqual: true} +) + +func deepCustomEqual(v1, v2 reflect.Value) result { + if !v1.IsValid() || !v2.IsValid() { + return couldNotCompare + } + + if v1.Type() != v2.Type() { + return couldNotCompare + } + + if v1.Type().Implements(customEqualityType) { + return successfulCompare(v1.Interface().(CustomEquality).Equals(v2.Interface())) + } + + // Somewhat redundant, but just in case: + if v2.Type().Implements(customEqualityType) { + return successfulCompare(v2.Interface().(CustomEquality).Equals(v1.Interface())) + } + + switch v1.Kind() { + case reflect.Array: + for i := 0; i < v1.Len(); i++ { + partResult := deepCustomEqual(v1.Index(i), v2.Index(i)) + if !partResult.compared || !partResult.isEqual { + return partResult + } + } + return comparedAndEqual + case reflect.Slice: + if v1.IsNil() != v2.IsNil() { + return couldNotCompare + } + if v1.Len() != v2.Len() { + return couldNotCompare + } + for i := 0; i < v1.Len(); i++ { + partResult := deepCustomEqual(v1.Index(i), v2.Index(i)) + if !partResult.compared || !partResult.isEqual { + return partResult + } + } + return comparedAndEqual + case reflect.Interface, reflect.Pointer: + if v1.IsNil() || v2.IsNil() { + return couldNotCompare + } + return deepCustomEqual(v1.Elem(), v2.Elem()) + case reflect.Struct: + for i, n := 0, v1.NumField(); i < n; i++ { + partResult := deepCustomEqual(v1.Field(i), v2.Field(i)) + if !partResult.compared || !partResult.isEqual { + return partResult + } + } + return comparedAndEqual + case reflect.Map: + if v1.IsNil() != v2.IsNil() { + return couldNotCompare + } + if v1.Len() != v2.Len() { + return couldNotCompare + } + iter := v1.MapRange() + for iter.Next() { + val1 := iter.Value() + val2 := v2.MapIndex(iter.Key()) + if !val1.IsValid() || !val2.IsValid() { + return couldNotCompare + } + partResult := deepCustomEqual(val1, val2) + if !partResult.compared || !partResult.isEqual { + return partResult + } + } + return comparedAndEqual + default: + return couldNotCompare + } +} diff --git a/internal/runtime/internal/controller/node_builtin_component.go b/internal/runtime/internal/controller/node_builtin_component.go index 43e4893a2f..cc6fba1fa2 100644 --- a/internal/runtime/internal/controller/node_builtin_component.go +++ b/internal/runtime/internal/controller/node_builtin_component.go @@ -18,6 +18,7 @@ import ( "github.com/grafana/alloy/internal/component" "github.com/grafana/alloy/internal/featuregate" + "github.com/grafana/alloy/internal/runtime/equality" "github.com/grafana/alloy/internal/runtime/logging" "github.com/grafana/alloy/internal/runtime/tracing" "github.com/grafana/alloy/syntax/ast" @@ -282,7 +283,7 @@ func (cn *BuiltinComponentNode) evaluate(scope *vm.Scope) error { return nil } - if reflect.DeepEqual(cn.args, argsCopyValue) { + if equality.DeepEqual(cn.args, argsCopyValue) { // Ignore components which haven't changed. This reduces the cost of // calling evaluate for components where evaluation is expensive (e.g., if // re-evaluating requires re-starting some internal logic). @@ -371,7 +372,7 @@ func (cn *BuiltinComponentNode) setExports(e component.Exports) { var changed bool cn.exportsMut.Lock() - if !reflect.DeepEqual(cn.exports, e) { + if !equality.DeepEqual(cn.exports, e) { changed = true cn.exports = e } diff --git a/internal/runtime/internal/controller/node_custom_component.go b/internal/runtime/internal/controller/node_custom_component.go index 52fff29bc0..a06f4b8193 100644 --- a/internal/runtime/internal/controller/node_custom_component.go +++ b/internal/runtime/internal/controller/node_custom_component.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "path" - "reflect" "strings" "sync" "time" @@ -12,6 +11,7 @@ import ( "github.com/go-kit/log" "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/runtime/equality" "github.com/grafana/alloy/syntax/ast" "github.com/grafana/alloy/syntax/vm" ) @@ -264,7 +264,7 @@ func (cn *CustomComponentNode) setExports(e component.Exports) { var changed bool cn.exportsMut.Lock() - if !reflect.DeepEqual(cn.exports, e) { + if !equality.DeepEqual(cn.exports, e) { changed = true cn.exports = e } diff --git a/internal/runtime/internal/controller/node_service.go b/internal/runtime/internal/controller/node_service.go index 85619b2d34..899cb10c47 100644 --- a/internal/runtime/internal/controller/node_service.go +++ b/internal/runtime/internal/controller/node_service.go @@ -7,6 +7,7 @@ import ( "sync" "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/runtime/equality" "github.com/grafana/alloy/internal/service" "github.com/grafana/alloy/syntax/ast" "github.com/grafana/alloy/syntax/vm" @@ -104,7 +105,7 @@ func (sn *ServiceNode) Evaluate(scope *vm.Scope) error { // since services expect a non-pointer. argsCopyValue := reflect.ValueOf(argsPointer).Elem().Interface() - if reflect.DeepEqual(sn.args, argsCopyValue) { + if equality.DeepEqual(sn.args, argsCopyValue) { // Ignore arguments which haven't changed. This reduces the cost of calling // evaluate for services where evaluation is expensive (e.g., if // re-evaluating requires re-starting some internal logic). diff --git a/internal/runtime/internal/controller/value_cache.go b/internal/runtime/internal/controller/value_cache.go index 009c592d74..f1319db82b 100644 --- a/internal/runtime/internal/controller/value_cache.go +++ b/internal/runtime/internal/controller/value_cache.go @@ -2,10 +2,10 @@ package controller import ( "fmt" - "reflect" "sync" "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/runtime/equality" "github.com/grafana/alloy/syntax/vm" ) @@ -96,7 +96,7 @@ func (vc *valueCache) CacheModuleExportValue(name string, value any) { v, found := vc.moduleExports[name] if !found { vc.moduleChangedIndex++ - } else if !reflect.DeepEqual(v, value) { + } else if !equality.DeepEqual(v, value) { vc.moduleChangedIndex++ } diff --git a/internal/runtime/internal/importsource/import_file.go b/internal/runtime/internal/importsource/import_file.go index e4691d9ed5..d0e10e02c6 100644 --- a/internal/runtime/internal/importsource/import_file.go +++ b/internal/runtime/internal/importsource/import_file.go @@ -7,14 +7,15 @@ import ( "io/fs" "os" "path/filepath" - "reflect" "strings" "sync" "time" "github.com/go-kit/log" + "github.com/grafana/alloy/internal/component" filedetector "github.com/grafana/alloy/internal/filedetector" + "github.com/grafana/alloy/internal/runtime/equality" "github.com/grafana/alloy/internal/runtime/logging/level" "github.com/grafana/alloy/internal/util" "github.com/grafana/alloy/syntax/vm" @@ -84,7 +85,7 @@ func (im *ImportFile) Evaluate(scope *vm.Scope) error { return fmt.Errorf("decoding configuration: %w", err) } - if reflect.DeepEqual(im.args, arguments) { + if equality.DeepEqual(im.args, arguments) { return nil } im.args = arguments diff --git a/internal/runtime/internal/importsource/import_git.go b/internal/runtime/internal/importsource/import_git.go index 02ee1ed675..29f3ef8b22 100644 --- a/internal/runtime/internal/importsource/import_git.go +++ b/internal/runtime/internal/importsource/import_git.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "path/filepath" - "reflect" "strings" "sync" "time" @@ -13,6 +12,7 @@ import ( "github.com/go-kit/log" "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/runtime/equality" "github.com/grafana/alloy/internal/runtime/logging/level" "github.com/grafana/alloy/internal/vcs" "github.com/grafana/alloy/syntax" @@ -93,7 +93,7 @@ func (im *ImportGit) Evaluate(scope *vm.Scope) error { return fmt.Errorf("decoding configuration: %w", err) } - if reflect.DeepEqual(im.args, arguments) { + if equality.DeepEqual(im.args, arguments) { return nil } @@ -208,7 +208,7 @@ func (im *ImportGit) Update(args component.Arguments) (err error) { // Create or update the repo field. // Failure to update repository makes the module loader temporarily use cached contents on disk - if im.repo == nil || !reflect.DeepEqual(repoOpts, im.repoOpts) { + if im.repo == nil || !equality.DeepEqual(repoOpts, im.repoOpts) { r, err := vcs.NewGitRepo(context.Background(), im.repoPath, repoOpts) if err != nil { if errors.As(err, &vcs.UpdateFailedError{}) { diff --git a/internal/runtime/internal/importsource/import_http.go b/internal/runtime/internal/importsource/import_http.go index 815748f8f6..73f19eced0 100644 --- a/internal/runtime/internal/importsource/import_http.go +++ b/internal/runtime/internal/importsource/import_http.go @@ -5,12 +5,12 @@ import ( "fmt" "net/http" "path" - "reflect" "time" "github.com/grafana/alloy/internal/component" common_config "github.com/grafana/alloy/internal/component/common/config" remote_http "github.com/grafana/alloy/internal/component/remote/http" + "github.com/grafana/alloy/internal/runtime/equality" "github.com/grafana/alloy/syntax/vm" ) @@ -84,7 +84,7 @@ func (im *ImportHTTP) Evaluate(scope *vm.Scope) error { im.arguments = arguments } - if reflect.DeepEqual(im.arguments, arguments) { + if equality.DeepEqual(im.arguments, arguments) { return nil } diff --git a/internal/runtime/internal/importsource/import_string.go b/internal/runtime/internal/importsource/import_string.go index a8a1249fc4..e1ec0644eb 100644 --- a/internal/runtime/internal/importsource/import_string.go +++ b/internal/runtime/internal/importsource/import_string.go @@ -3,9 +3,9 @@ package importsource import ( "context" "fmt" - "reflect" "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/runtime/equality" "github.com/grafana/alloy/syntax/alloytypes" "github.com/grafana/alloy/syntax/vm" ) @@ -37,7 +37,7 @@ func (im *ImportString) Evaluate(scope *vm.Scope) error { return fmt.Errorf("decoding configuration: %w", err) } - if reflect.DeepEqual(im.arguments, arguments) { + if equality.DeepEqual(im.arguments, arguments) { return nil } im.arguments = arguments diff --git a/internal/runtime/internal/testcomponents/module/git/git.go b/internal/runtime/internal/testcomponents/module/git/git.go index b4cf2fb984..12100d2bfb 100644 --- a/internal/runtime/internal/testcomponents/module/git/git.go +++ b/internal/runtime/internal/testcomponents/module/git/git.go @@ -5,13 +5,14 @@ import ( "context" "errors" "path/filepath" - "reflect" "sync" "time" "github.com/go-kit/log" + "github.com/grafana/alloy/internal/component" "github.com/grafana/alloy/internal/featuregate" + "github.com/grafana/alloy/internal/runtime/equality" "github.com/grafana/alloy/internal/runtime/internal/testcomponents/module" "github.com/grafana/alloy/internal/runtime/logging/level" "github.com/grafana/alloy/internal/vcs" @@ -199,7 +200,7 @@ func (c *Component) Update(args component.Arguments) (err error) { // Create or update the repo field. // Failure to update repository makes the module loader temporarily use cached contents on disk - if c.repo == nil || !reflect.DeepEqual(repoOpts, c.repoOpts) { + if c.repo == nil || !equality.DeepEqual(repoOpts, c.repoOpts) { r, err := vcs.NewGitRepo(context.Background(), repoPath, repoOpts) if err != nil { if errors.As(err, &vcs.UpdateFailedError{}) { diff --git a/internal/runtime/internal/testcomponents/module/module.go b/internal/runtime/internal/testcomponents/module/module.go index f7a420b2c4..f9d3c81d1f 100644 --- a/internal/runtime/internal/testcomponents/module/module.go +++ b/internal/runtime/internal/testcomponents/module/module.go @@ -3,11 +3,11 @@ package module import ( "context" "fmt" - "reflect" "sync" "time" "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/runtime/equality" "github.com/grafana/alloy/internal/runtime/logging/level" ) @@ -44,7 +44,7 @@ func NewModuleComponent(o component.Options) (*ModuleComponent, error) { // It will set the component health in addition to return the error so that the consumer can rely on either or both. // If the content is the same as the last time it was successfully loaded, it will not be reloaded. func (c *ModuleComponent) LoadAlloySource(args map[string]any, contentValue string) error { - if reflect.DeepEqual(args, c.getLatestArgs()) && contentValue == c.getLatestContent() { + if equality.DeepEqual(args, c.getLatestArgs()) && contentValue == c.getLatestContent() { return nil } diff --git a/internal/service/ui/ui.go b/internal/service/ui/ui.go index ea69448a82..2832f642bf 100644 --- a/internal/service/ui/ui.go +++ b/internal/service/ui/ui.go @@ -8,6 +8,7 @@ import ( "path" "github.com/gorilla/mux" + "github.com/grafana/alloy/internal/featuregate" "github.com/grafana/alloy/internal/service" http_service "github.com/grafana/alloy/internal/service/http" diff --git a/internal/static/traces/promsdprocessor/consumer/consumer.go b/internal/static/traces/promsdprocessor/consumer/consumer.go index c96b79e024..3bddc9862b 100644 --- a/internal/static/traces/promsdprocessor/consumer/consumer.go +++ b/internal/static/traces/promsdprocessor/consumer/consumer.go @@ -10,13 +10,14 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" - "github.com/grafana/alloy/internal/component/discovery" "github.com/prometheus/common/model" "go.opentelemetry.io/collector/client" otelconsumer "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" semconv "go.opentelemetry.io/collector/semconv/v1.5.0" + + "github.com/grafana/alloy/internal/component/discovery" ) const ( @@ -27,7 +28,7 @@ const ( // OperationTypeUpsert does both of above OperationTypeUpsert = "upsert" - //TODO: It'd be cleaner to get these from the otel semver package? + // TODO: It'd be cleaner to get these from the otel semver package? // Not all are in semver though. E.g. "k8s.pod.ip" is internal inside the k8sattributesprocessor. PodAssociationIPLabel = "ip" PodAssociationOTelIPLabel = "net.host.ip" @@ -160,20 +161,21 @@ func (c *Consumer) processAttributes(ctx context.Context, attrs pcommon.Map) { return } - for _, label := range labels.Labels() { + labels.ForEachLabel(func(label string, value string) bool { switch c.opts.OperationType { case OperationTypeUpsert: - attrs.PutStr(label.Name, label.Value) + attrs.PutStr(label, value) case OperationTypeInsert: - if _, ok := attrs.Get(label.Name); !ok { - attrs.PutStr(label.Name, label.Value) + if _, ok := attrs.Get(label); !ok { + attrs.PutStr(label, value) } case OperationTypeUpdate: - if toVal, ok := attrs.Get(label.Name); ok { - toVal.SetStr(label.Value) + if toVal, ok := attrs.Get(label); ok { + toVal.SetStr(value) } } - } + return true + }) } func (c *Consumer) getPodIP(ctx context.Context, attrs pcommon.Map) string { @@ -230,9 +232,9 @@ func (c *Consumer) getConnectionIP(ctx context.Context) string { } func GetHostFromLabels(labels discovery.Target) (string, error) { - address, ok := labels[model.AddressLabel] + address, ok := labels.Get(model.AddressLabel) if !ok { - return "", fmt.Errorf("unable to find address in labels %q", labels.Labels()) + return "", fmt.Errorf("unable to find address in labels %q", labels) } host := address @@ -247,12 +249,6 @@ func GetHostFromLabels(labels discovery.Target) (string, error) { return host, nil } -func NewTargetsWithNonInternalLabels(labels discovery.Target) discovery.Target { - res := make(discovery.Target) - for k, v := range labels { - if !strings.HasPrefix(k, "__") { - res[k] = v - } - } - return res +func NewTargetsWithNonInternalLabels(target discovery.Target) discovery.Target { + return discovery.NewTargetFromLabelSet(target.NonReservedLabelSet()) } diff --git a/internal/static/traces/promsdprocessor/consumer/consumer_test.go b/internal/static/traces/promsdprocessor/consumer/consumer_test.go index 758b4095e5..4d9ad80f92 100644 --- a/internal/static/traces/promsdprocessor/consumer/consumer_test.go +++ b/internal/static/traces/promsdprocessor/consumer/consumer_test.go @@ -5,14 +5,15 @@ import ( "net" "testing" - "github.com/grafana/alloy/internal/component/discovery" - "github.com/grafana/alloy/internal/util" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/client" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pdata/pcommon" semconv "go.opentelemetry.io/collector/semconv/v1.5.0" "gotest.tools/assert" + + "github.com/grafana/alloy/internal/component/discovery" + "github.com/grafana/alloy/internal/util" ) func TestOperationType(t *testing.T) { @@ -84,9 +85,9 @@ func TestOperationType(t *testing.T) { } consumerOpts := Options{ HostLabels: map[string]discovery.Target{ - attrIP: { + attrIP: discovery.NewTargetFromMap(map[string]string{ attrKey: tc.newValue, - }, + }), }, OperationType: tc.operationType, PodAssociations: podAssociations, diff --git a/internal/static/traces/promsdprocessor/prom_sd_processor.go b/internal/static/traces/promsdprocessor/prom_sd_processor.go index b916705581..92ad2a218d 100644 --- a/internal/static/traces/promsdprocessor/prom_sd_processor.go +++ b/internal/static/traces/promsdprocessor/prom_sd_processor.go @@ -178,11 +178,7 @@ func (p *promServiceDiscoProcessor) syncTargets(jobName string, group *targetgro continue } - var labels = make(discovery.Target) - for k, v := range processedLabels.Map() { - labels[k] = v - } - + var labels = discovery.NewTargetFromModelLabels(processedLabels) host, err := promsdconsumer.GetHostFromLabels(labels) if err != nil { level.Warn(p.logger).Log("msg", "ignoring target, unable to find address", "err", err) diff --git a/internal/static/traces/promsdprocessor/prom_sd_processor_test.go b/internal/static/traces/promsdprocessor/prom_sd_processor_test.go index 3641168149..3113a42306 100644 --- a/internal/static/traces/promsdprocessor/prom_sd_processor_test.go +++ b/internal/static/traces/promsdprocessor/prom_sd_processor_test.go @@ -4,11 +4,12 @@ import ( "testing" "github.com/go-kit/log" - "github.com/grafana/alloy/internal/component/discovery" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/model/relabel" "github.com/stretchr/testify/assert" + + "github.com/grafana/alloy/internal/component/discovery" ) func TestSyncGroups(t *testing.T) { @@ -38,7 +39,7 @@ func TestSyncGroups(t *testing.T) { }, }, expected: map[string]discovery.Target{ - "127.0.0.1": {}, + "127.0.0.1": discovery.EmptyTarget, }, }, { @@ -54,9 +55,9 @@ func TestSyncGroups(t *testing.T) { }, }, expected: map[string]discovery.Target{ - "127.0.0.1": { + "127.0.0.1": discovery.NewTargetFromMap(map[string]string{ "label": "val", - }, + }), }, }, { @@ -72,9 +73,9 @@ func TestSyncGroups(t *testing.T) { }, }, expected: map[string]discovery.Target{ - "127.0.0.1": { + "127.0.0.1": discovery.NewTargetFromMap(map[string]string{ "label": "val", - }, + }), }, }, { @@ -90,7 +91,7 @@ func TestSyncGroups(t *testing.T) { }, }, expected: map[string]discovery.Target{ - "127.0.0.1": {}, + "127.0.0.1": discovery.EmptyTarget, }, }, } diff --git a/internal/util/testtarget/test_target.go b/internal/util/testtarget/test_target.go index 0dab941635..12afbff640 100644 --- a/internal/util/testtarget/test_target.go +++ b/internal/util/testtarget/test_target.go @@ -45,9 +45,9 @@ func (t *TestTarget) AddHistogram(opts prometheus.HistogramOpts) prometheus.Hist } func (t *TestTarget) Target() discovery.Target { - return discovery.Target{ + return discovery.NewTargetFromMap(map[string]string{ "__address__": t.server.Listener.Addr().String(), - } + }) } func (t *TestTarget) Registry() *prometheus.Registry { diff --git a/syntax/encoding/alloyjson/alloyjson.go b/syntax/encoding/alloyjson/alloyjson.go index 28ab68506b..054aa22c6b 100644 --- a/syntax/encoding/alloyjson/alloyjson.go +++ b/syntax/encoding/alloyjson/alloyjson.go @@ -281,34 +281,46 @@ func buildJSONValue(v value.Value) jsonValue { return jsonValue{Type: "array", Value: elements} case value.TypeObject: - keys := v.Keys() - - // If v isn't an ordered object (i.e., a go map), sort the keys so they - // have a deterministic print order. - if !v.OrderedKeys() { - sort.Strings(keys) - } - - fields := []jsonObjectField{} - - for i := 0; i < len(keys); i++ { - field, _ := v.Key(keys[i]) - - fields = append(fields, jsonObjectField{ - Key: keys[i], - Value: buildJSONValue(field), - }) - } - - return jsonValue{Type: "object", Value: fields} + return tokenizeObject(v) case value.TypeFunction: return jsonValue{Type: "function", Value: v.Describe()} case value.TypeCapsule: + if v.Implements(reflect.TypeFor[value.ConvertibleIntoCapsule]()) { + // Check if this capsule can be converted into Alloy object for more detailed description: + newVal := make(map[string]value.Value) + if err := v.ReflectAddr().Interface().(value.ConvertibleIntoCapsule).ConvertInto(&newVal); err == nil { + return tokenizeObject(value.Encode(newVal)) + } + } + // Otherwise, describe the value return jsonValue{Type: "capsule", Value: v.Describe()} default: panic(fmt.Sprintf("syntax/encoding/alloyjson: unrecognized value type %q", v.Type())) } } + +func tokenizeObject(v value.Value) jsonValue { + keys := v.Keys() + + // If v isn't an ordered object (i.e., a go map), sort the keys so they + // have a deterministic print order. + if !v.OrderedKeys() { + sort.Strings(keys) + } + + fields := []jsonObjectField{} + + for i := 0; i < len(keys); i++ { + field, _ := v.Key(keys[i]) + + fields = append(fields, jsonObjectField{ + Key: keys[i], + Value: buildJSONValue(field), + }) + } + + return jsonValue{Type: "object", Value: fields} +} diff --git a/syntax/encoding/alloyjson/alloyjson_test.go b/syntax/encoding/alloyjson/alloyjson_test.go index 2e9bf65269..36aa1786e2 100644 --- a/syntax/encoding/alloyjson/alloyjson_test.go +++ b/syntax/encoding/alloyjson/alloyjson_test.go @@ -1,12 +1,14 @@ package alloyjson_test import ( + "fmt" "testing" + "github.com/stretchr/testify/require" + "github.com/grafana/alloy/syntax" "github.com/grafana/alloy/syntax/alloytypes" "github.com/grafana/alloy/syntax/encoding/alloyjson" - "github.com/stretchr/testify/require" ) func TestValues(t *testing.T) { @@ -89,6 +91,20 @@ func TestValues(t *testing.T) { input: alloytypes.Secret("foo"), expectJSON: `{ "type": "capsule", "value": "(secret)" }`, }, + { + name: "mappable capsule", + input: capsuleConvertibleToObject{ + name: "Scrooge McDuck", + address: "Duckburg, Killmotor Hill", + }, + expectJSON: `{ + "type": "object", + "value": [ + { "key": "address", "value": { "type": "string", "value": "Duckburg, Killmotor Hill" }}, + { "key": "name", "value": { "type": "string", "value": "Scrooge McDuck" }} + ] + }`, + }, { // nil arrays and objects must always be [] instead of null as that's // what the API definition says they should be. @@ -361,3 +377,28 @@ func TestRawMap_Capsule(t *testing.T) { require.NoError(t, err) require.JSONEq(t, expect, string(bb)) } + +type capsuleConvertibleToObject struct { + name string + address string +} + +func (c capsuleConvertibleToObject) ConvertInto(dst interface{}) error { + switch dst := dst.(type) { + case *map[string]syntax.Value: + result := map[string]syntax.Value{ + "name": syntax.ValueFromString(c.name), + "address": syntax.ValueFromString(c.address), + } + *dst = result + return nil + } + return fmt.Errorf("capsuleConvertibleToObject: conversion to '%T' is not supported", dst) +} + +func (c capsuleConvertibleToObject) AlloyCapsule() {} + +var ( + _ syntax.Capsule = capsuleConvertibleToObject{} + _ syntax.ConvertibleIntoCapsule = capsuleConvertibleToObject{} +) diff --git a/syntax/internal/value/value.go b/syntax/internal/value/value.go index 829449370e..69599c0a29 100644 --- a/syntax/internal/value/value.go +++ b/syntax/internal/value/value.go @@ -199,17 +199,10 @@ func (v Value) Text() string { panic("syntax/value: Text called on non-string type") } - // Attempt to get an address to v.rv for interface checking. - // - // The normal v.rv value is used for other checks. - addrRV := v.rv - if addrRV.CanAddr() { - addrRV = addrRV.Addr() - } switch { - case addrRV.Type().Implements(goTextMarshaler): + case v.Implements(goTextMarshaler): // TODO(rfratto): what should we do if this fails? - text, _ := addrRV.Interface().(encoding.TextMarshaler).MarshalText() + text, _ := v.ReflectAddr().Interface().(encoding.TextMarshaler).MarshalText() return string(text) case v.rv.Type() == goDuration: @@ -221,6 +214,10 @@ func (v Value) Text() string { } } +func (v Value) IsString() bool { + return v.Type() == TypeString +} + // Len returns the length of v. Panics if v is not an array or object. func (v Value) Len() int { switch v.ty { @@ -258,9 +255,23 @@ func (v Value) Interface() interface{} { return v.rv.Interface() } +func (v Value) Implements(t reflect.Type) bool { + return v.ReflectAddr().Type().Implements(t) +} + // Reflect returns the raw reflection value backing v. func (v Value) Reflect() reflect.Value { return v.rv } +// ReflectAddr is like Reflect, but attempts to get an address of the raw reflection value where possible. +func (v Value) ReflectAddr() reflect.Value { + // Attempt to get an address to v.rv + addrRV := v.rv + if addrRV.CanAddr() { + addrRV = addrRV.Addr() + } + return addrRV +} + // makeValue converts a reflect value into a Value, dereferencing any pointers or // interface{} values. func makeValue(v reflect.Value) Value { diff --git a/syntax/token/builder/builder_test.go b/syntax/token/builder/builder_test.go index 47b095dd61..35e351ee13 100644 --- a/syntax/token/builder/builder_test.go +++ b/syntax/token/builder/builder_test.go @@ -6,11 +6,13 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + + "github.com/grafana/alloy/syntax" "github.com/grafana/alloy/syntax/parser" "github.com/grafana/alloy/syntax/printer" "github.com/grafana/alloy/syntax/token" "github.com/grafana/alloy/syntax/token/builder" - "github.com/stretchr/testify/require" ) func TestBuilder_File(t *testing.T) { @@ -43,6 +45,31 @@ func TestBuilder_File(t *testing.T) { require.Equal(t, expect, string(f.Bytes())) } +type capsuleConvertibleToObject struct { + name string + address string +} + +func (c capsuleConvertibleToObject) ConvertInto(dst interface{}) error { + switch dst := dst.(type) { + case *map[string]syntax.Value: + result := map[string]syntax.Value{ + "name": syntax.ValueFromString(c.name), + "address": syntax.ValueFromString(c.address), + } + *dst = result + return nil + } + return fmt.Errorf("capsuleConvertibleToObject: conversion to '%T' is not supported", dst) +} + +func (c capsuleConvertibleToObject) AlloyCapsule() {} + +var ( + _ syntax.Capsule = capsuleConvertibleToObject{} + _ syntax.ConvertibleIntoCapsule = capsuleConvertibleToObject{} +) + func TestBuilder_GoEncode(t *testing.T) { f := builder.NewFile() @@ -55,7 +82,17 @@ func TestBuilder_GoEncode(t *testing.T) { f.Body().SetAttributeValue("bool", true) f.Body().SetAttributeValue("list", []int{0, 1, 2}) f.Body().SetAttributeValue("func", func(int, int) int { return 0 }) + f.Body().AppendTokens([]builder.Token{{token.LITERAL, "\n"}}) + f.Body().SetAttributeValue("capsule", make(chan int)) + f.Body().SetAttributeValue("mappable_capsule", capsuleConvertibleToObject{ + name: "Bert", + address: "11a Sesame St", + }) + f.Body().SetAttributeValue("mappable_capsule_ptr", &capsuleConvertibleToObject{ + name: "Ernie", + address: "11b Sesame St", + }) f.Body().AppendTokens([]builder.Token{{token.LITERAL, "\n"}}) f.Body().SetAttributeValue("map", map[string]interface{}{"foo": "bar"}) @@ -73,12 +110,21 @@ func TestBuilder_GoEncode(t *testing.T) { // Hello, world! null_value = null - num = 15 - string = "Hello, world!" - bool = true - list = [0, 1, 2] - func = function - capsule = capsule("chan int") + num = 15 + string = "Hello, world!" + bool = true + list = [0, 1, 2] + func = function + + capsule = capsule("chan int") + mappable_capsule = { + address = "11a Sesame St", + name = "Bert", + } + mappable_capsule_ptr = { + address = "11b Sesame St", + name = "Ernie", + } map = { foo = "bar", diff --git a/syntax/token/builder/value_tokens.go b/syntax/token/builder/value_tokens.go index e07b187a39..ca872246d2 100644 --- a/syntax/token/builder/value_tokens.go +++ b/syntax/token/builder/value_tokens.go @@ -2,6 +2,7 @@ package builder import ( "fmt" + "reflect" "sort" "github.com/grafana/alloy/syntax/internal/value" @@ -57,35 +58,25 @@ func valueTokens(v value.Value) []Token { toks = append(toks, Token{token.RBRACK, ""}) case value.TypeObject: - toks = append(toks, Token{token.LCURLY, ""}, Token{token.LITERAL, "\n"}) - - keys := v.Keys() - - // If v isn't an ordered object (i.e., a go map), sort the keys so they - // have a deterministic print order. - if !v.OrderedKeys() { - sort.Strings(keys) - } - - for i := 0; i < len(keys); i++ { - if scanner.IsValidIdentifier(keys[i]) { - toks = append(toks, Token{token.IDENT, keys[i]}) - } else { - toks = append(toks, Token{token.STRING, fmt.Sprintf("%q", keys[i])}) - } - - field, _ := v.Key(keys[i]) - toks = append(toks, Token{token.ASSIGN, ""}) - toks = append(toks, valueTokens(field)...) - toks = append(toks, Token{token.COMMA, ""}, Token{token.LITERAL, "\n"}) - } - toks = append(toks, Token{token.RCURLY, ""}) + toks = objectTokens(v) case value.TypeFunction: toks = append(toks, Token{token.LITERAL, v.Describe()}) case value.TypeCapsule: - toks = append(toks, Token{token.LITERAL, v.Describe()}) + done := false + if v.Implements(reflect.TypeFor[value.ConvertibleIntoCapsule]()) { + // Check if this capsule can be converted into Alloy object for more detailed description: + newVal := make(map[string]value.Value) + if err := v.ReflectAddr().Interface().(value.ConvertibleIntoCapsule).ConvertInto(&newVal); err == nil { + toks = tokenEncode(newVal) + done = true + } + } + if !done { + // Default to Describe() for capsules that don't support other representation. + toks = append(toks, Token{token.LITERAL, v.Describe()}) + } default: panic(fmt.Sprintf("syntax/token/builder: unrecognized value type %q", v.Type())) @@ -93,3 +84,30 @@ func valueTokens(v value.Value) []Token { return toks } + +func objectTokens(v value.Value) []Token { + toks := []Token{{token.LCURLY, ""}, {token.LITERAL, "\n"}} + + keys := v.Keys() + + // If v isn't an ordered object (i.e. it is a go map), sort the keys so they + // have a deterministic print order. + if !v.OrderedKeys() { + sort.Strings(keys) + } + + for i := 0; i < len(keys); i++ { + if scanner.IsValidIdentifier(keys[i]) { + toks = append(toks, Token{token.IDENT, keys[i]}) + } else { + toks = append(toks, Token{token.STRING, fmt.Sprintf("%q", keys[i])}) + } + + field, _ := v.Key(keys[i]) + toks = append(toks, Token{token.ASSIGN, ""}) + toks = append(toks, valueTokens(field)...) + toks = append(toks, Token{token.COMMA, ""}, Token{token.LITERAL, "\n"}) + } + toks = append(toks, Token{token.RCURLY, ""}) + return toks +} diff --git a/syntax/types.go b/syntax/types.go index 41afef0d7b..8d39c6119f 100644 --- a/syntax/types.go +++ b/syntax/types.go @@ -95,3 +95,9 @@ type ConvertibleIntoCapsule interface { // available. Other errors are treated as an Alloy decoding error. ConvertInto(dst interface{}) error } + +// Value represents an Alloy value. See the value.Value for more details. +type Value = value.Value + +// ValueFromString creates a new Value from a given string. +var ValueFromString = value.String