Skip to content

Commit

Permalink
more cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
thampiotr committed Jan 30, 2025
1 parent d8abc6c commit 498ac9d
Show file tree
Hide file tree
Showing 10 changed files with 40 additions and 91 deletions.
2 changes: 1 addition & 1 deletion internal/component/beyla/ebpf/beyla_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ func (c *Component) Update(args component.Arguments) error {
func (c *Component) baseTarget() (discovery.Target, error) {
data, err := c.opts.GetServiceData(http_service.ServiceName)
if err != nil {
return discovery.NewEmptyTarget(), fmt.Errorf("failed to get HTTP information: %w", err)
return discovery.EmptyTarget, fmt.Errorf("failed to get HTTP information: %w", err)
}
httpData := data.(http_service.Data)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (c *Component) Run(ctx context.Context) error {
func (c *Component) getBaseTarget() (discovery.Target, error) {
data, err := c.opts.GetServiceData(http_service.ServiceName)
if err != nil {
return discovery.NewEmptyTarget(), fmt.Errorf("failed to get HTTP information: %w", err)
return discovery.EmptyTarget, fmt.Errorf("failed to get HTTP information: %w", err)
}
httpData := data.(http_service.Data)

Expand Down
1 change: 0 additions & 1 deletion internal/component/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,6 @@ func toAlloyTargets(cache map[string]*targetgroup.Group) []Target {

for _, group := range cache {
for _, target := range group.Targets {
// TODO(thampiotr): how many have group.Labels empty?
allTargets = append(allTargets, NewTargetFromSpecificAndBaseLabelSet(target, group.Labels))
}
}
Expand Down
90 changes: 29 additions & 61 deletions internal/component/discovery/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ var (
func ComponentTargetsToPromTargetGroups(jobName string, tgs []Target) map[string][]*targetgroup.Group {
targetsWithCommonGroupLabels := map[uint64][]Target{}
for _, t := range tgs {
fp := t.GroupLabelsHash() // TODO(thampiotr): if worried about collisions we can use string?
fp := t.groupLabelsHash() // TODO(thampiotr): could use a cache if it's on exactly the same slice
targetsWithCommonGroupLabels[fp] = append(targetsWithCommonGroupLabels[fp], t)
}

Expand All @@ -59,8 +59,10 @@ func ComponentTargetsToPromTargetGroups(jobName string, tgs []Target) map[string
return allGroups
}

func NewEmptyTarget() Target {
return NewTargetFromLabelSet(make(commonlabels.LabelSet))
var EmptyTarget = Target{
group: commonlabels.LabelSet{},
own: commonlabels.LabelSet{},
size: 0,
}

func NewTargetFromLabelSet(ls commonlabels.LabelSet) Target {
Expand Down Expand Up @@ -107,8 +109,8 @@ func NewTargetFromMap(m map[string]string) Target {
return NewTargetFromLabelSet(l)
}

// TODO(thampiotr): discovery.relabel
// TODO(thampiotr): get rid of this one!!
// Labels converts this target into prometheus/prometheus/model/labels.Labels. It is not efficient and should be
// avoided on a hot path.
func (t Target) Labels() modellabels.Labels {
// This method allocates less than Builder or ScratchBuilder, as proven by benchmarks.
lb := make([]modellabels.Label, 0, t.Len())
Expand All @@ -123,22 +125,6 @@ func (t Target) Labels() modellabels.Labels {
return lb
}

// TODO(thampiotr): 13% allocs
// TODO(thampiotr): prometheus.scrape / distributed_targets
func (t Target) NonMetaLabels() modellabels.Labels {
return t.LabelsWithPredicate(func(key string) bool {
return !strings.HasPrefix(key, commonlabels.MetaLabelPrefix)
})
}

// TODO(thampiotr): loki.source.kubernetes / distributed_targets
func (t Target) SpecificLabels(lbls []string) modellabels.Labels {
// TODO(thampiotr): We can cache this!
return t.LabelsWithPredicate(func(key string) bool {
return slices.Contains(lbls, key)
})
}

func (t Target) NonReservedLabelSet() commonlabels.LabelSet {
// This may not be the most optimal way, but this method is NOT a known hot spot at the time of this comment.
result := make(commonlabels.LabelSet, t.Len())
Expand All @@ -151,22 +137,6 @@ func (t Target) NonReservedLabelSet() commonlabels.LabelSet {
return result
}

func (t Target) LabelsWithPredicate(pred func(key string) bool) modellabels.Labels {
// This method allocates less than Builder or ScratchBuilder, as proven by benchmarks.
lb := make([]modellabels.Label, 0, t.Len())
t.ForEachLabel(func(key string, value string) bool {
if pred == nil || pred(key) {
lb = append(lb, modellabels.Label{
Name: key,
Value: value,
})
}
return true
})
slices.SortFunc(lb, func(a, b modellabels.Label) int { return strings.Compare(a.Name, b.Name) })
return lb
}

// ForEachLabel runs f over each key value pair in the Target. f must not modify Target while iterating. If f returns
// false, the iteration is interrupted. If f returns true, the iteration continues until the last element. ForEachLabel
// returns true if all the labels were iterated over or false if any call to f has interrupted the iteration.
Expand Down Expand Up @@ -213,7 +183,7 @@ func (t Target) Get(key string) (string, bool) {
}

// LabelSet converts this target in to a LabelSet
// NOTE: this is not optimised and should be avoided if possible.
// Deprecated: this is not optimised and should be avoided if possible.
func (t Target) LabelSet() commonlabels.LabelSet {
merged := make(commonlabels.LabelSet, t.Len())
for k, v := range t.group {
Expand Down Expand Up @@ -338,30 +308,28 @@ func (t Target) HashLabelsWithPredicate(pred func(key string) bool) uint64 {
}
}

func (t Target) GroupLabelsHash() uint64 {
{
// For hash to be deterministic, we need labels order to be deterministic too. Figure this out first.
labelsInOrder := make([]string, 0, len(t.group)) // TODO(thampiotr): this can go to object pool?
for name := range t.group {
labelsInOrder = append(labelsInOrder, string(name))
}
slices.Sort(labelsInOrder)
func (t Target) groupLabelsHash() uint64 {
// For hash to be deterministic, we need labels order to be deterministic too. Figure this out first.
labelsInOrder := make([]string, 0, len(t.group)) // TODO(thampiotr): this can go to object pool?
for name := range t.group {
labelsInOrder = append(labelsInOrder, string(name))
}
slices.Sort(labelsInOrder)

mustGet := func(label string) string {
val, ok := t.Get(label)
if !ok {
panic("label concurrently modified - this is a bug - please report an issue")
}
return val
mustGet := func(label string) string {
val, ok := t.Get(label)
if !ok {
panic("label concurrently modified - this is a bug - please report an issue")
}
// TODO(thampiotr): Could do the same optimisation as in modelLabels to use 1KB buffer.
h := xxhash.New()
for _, label := range labelsInOrder {
_, _ = h.WriteString(label)
_, _ = h.Write(seps)
_, _ = h.WriteString(mustGet(label))
_, _ = h.Write(seps)
}
return h.Sum64()
return val
}
// TODO(thampiotr): Could do the same optimisation as in modelLabels to use 1KB buffer.
h := xxhash.New()
for _, label := range labelsInOrder {
_, _ = h.WriteString(label)
_, _ = h.Write(seps)
_, _ = h.WriteString(mustGet(label))
_, _ = h.Write(seps)
}
return h.Sum64()
}
2 changes: 1 addition & 1 deletion internal/component/discovery/target_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestConvertFromNative(t *testing.T) {
func TestEquals_Basic(t *testing.T) {
// NOTE: if we start caching anything as a field, the equality may break. We should test it.
t1 := NewTargetFromMap(map[string]string{"hip": "hop", "boom": "bap"})
require.Equal(t, 2, t1.Labels().Len())
require.Equal(t, 2, t1.Len())
tb := NewTargetBuilderFrom(t1)
tb.Set("boom", "bap")
t2 := tb.Target()
Expand Down
2 changes: 1 addition & 1 deletion internal/component/loki/source/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func (c *Component) Update(args component.Arguments) error {

promTargets := make([]promTarget, len(newArgs.Targets))
for i, target := range newArgs.Targets {
labelsCopy := target.LabelSet().Clone() // TODO: do we need to copy the labels here?
labelsCopy := target.LabelSet()
promTargets[i] = promTarget{labels: labelsCopy, fingerPrint: labelsCopy.Fingerprint()}
}

Expand Down
13 changes: 2 additions & 11 deletions internal/component/prometheus/scrape/scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func (c *Component) distributeTargets(

newLocalTargets := newDistTargets.LocalTargets()
c.targetsGauge.Set(float64(len(newLocalTargets)))
promNewTargets := c.componentTargetsToPromTargetGroups(jobName, newLocalTargets)
promNewTargets := discovery.ComponentTargetsToPromTargetGroups(jobName, newLocalTargets)

movedTargets := newDistTargets.MovedToRemoteInstance(oldDistributedTargets)
c.movedTargetsCounter.Add(float64(len(movedTargets)))
Expand Down Expand Up @@ -479,19 +479,10 @@ func (c *Component) DebugInfo() interface{} {
}
}

func (c *Component) componentTargetsToPromTargetGroups(jobName string, tgs []discovery.Target) map[string][]*targetgroup.Group {
promGroup := &targetgroup.Group{Source: jobName}
for _, tg := range tgs {
promGroup.Targets = append(promGroup.Targets, tg.LabelSet())
}

return map[string][]*targetgroup.Group{jobName: {promGroup}}
}

func (c *Component) populatePromLabels(targets []discovery.Target, jobName string, args Arguments) []*scrape.Target {
lb := labels.NewBuilder(labels.EmptyLabels())
promTargets, errs := scrape.TargetsFromGroup(
c.componentTargetsToPromTargetGroups(jobName, targets)[jobName][0],
discovery.ComponentTargetsToPromTargetGroups(jobName, targets)[jobName][0],
getPromScrapeConfigs(c.opts.ID, args),
false, /* noDefaultScrapePort - always false in this component */
make([]*scrape.Target, len(targets)), /* targets slice to reuse */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ var (

// There is a race condition in prometheus where calls to NewManager can race over a package-global variable when
// calling targetMetadataCache.registerManager(m). This is a workaround to prevent this for now.
//TODO(thampiotr): Open an issue in prometheus to fix this?
// TODO(thampiotr): Open an issue in prometheus to fix this?
promManagerMutex sync.Mutex
)

Expand Down Expand Up @@ -260,7 +260,7 @@ func setUpClusterLookup(fakeCluster *fakeCluster, assignment map[peer.Peer][]int
fakeCluster.lookupMap = make(map[shard.Key][]peer.Peer)
for owningPeer, ownedTargets := range assignment {
for _, id := range ownedTargets {
fakeCluster.lookupMap[shard.Key(targets[id].Target().NonMetaLabels().Hash())] = []peer.Peer{owningPeer}
fakeCluster.lookupMap[shard.Key(targets[id].Target().NonMetaLabelsHash())] = []peer.Peer{owningPeer}
}
}
}
Expand Down
11 changes: 1 addition & 10 deletions internal/component/pyroscope/scrape/scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

config_util "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/discovery/targetgroup"

"github.com/grafana/alloy/internal/component/pyroscope"
Expand Down Expand Up @@ -322,7 +321,7 @@ func (c *Component) Run(ctx context.Context) error {
// NOTE(@tpaschalis) First approach, manually building the
// 'clustered' targets implementation every time.
ct := discovery.NewDistributedTargets(clusteringEnabled, c.cluster, tgs)
promTargets := c.componentTargetsToProm(jobName, ct.LocalTargets())
promTargets := discovery.ComponentTargetsToPromTargetGroups(jobName, ct.LocalTargets())

select {
case targetSetsChan <- promTargets:
Expand Down Expand Up @@ -374,14 +373,6 @@ func (c *Component) NotifyClusterChange() {
}
}

func (c *Component) componentTargetsToProm(jobName string, tgs []discovery.Target) map[string][]*targetgroup.Group {
promGroup := &targetgroup.Group{Source: jobName, Targets: make([]model.LabelSet, 0, len(tgs))}
for _, tg := range tgs {
promGroup.Targets = append(promGroup.Targets, tg.LabelSet())
}
return map[string][]*targetgroup.Group{jobName: {promGroup}}
}

// DebugInfo implements component.DebugComponent.
func (c *Component) DebugInfo() interface{} {
var res []scrape.TargetStatus
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestSyncGroups(t *testing.T) {
},
},
expected: map[string]discovery.Target{
"127.0.0.1": discovery.NewEmptyTarget(),
"127.0.0.1": discovery.EmptyTarget,
},
},
{
Expand Down Expand Up @@ -91,7 +91,7 @@ func TestSyncGroups(t *testing.T) {
},
},
expected: map[string]discovery.Target{
"127.0.0.1": discovery.NewEmptyTarget(),
"127.0.0.1": discovery.EmptyTarget,
},
},
}
Expand Down

0 comments on commit 498ac9d

Please sign in to comment.