Skip to content

Commit

Permalink
New loki.process stage structured_metadata_regex, to move all labels …
Browse files Browse the repository at this point in the history
…matching a regex to structured metadata
  • Loading branch information
akevdmeer committed Jan 30, 2025
1 parent 0bb74cc commit 61af452
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 0 deletions.
1 change: 1 addition & 0 deletions internal/component/loki/process/stages/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type StageConfig struct {
ReplaceConfig *ReplaceConfig `alloy:"replace,block,optional"`
StaticLabelsConfig *StaticLabelsConfig `alloy:"static_labels,block,optional"`
StructuredMetadata *LabelsConfig `alloy:"structured_metadata,block,optional"`
StructuredMetadataRegex *StructuredMetadataRegexConfig `alloy:"structured_metadata_regex,block,optional"`
SamplingConfig *SamplingConfig `alloy:"sampling,block,optional"`
TemplateConfig *TemplateConfig `alloy:"template,block,optional"`
TenantConfig *TenantConfig `alloy:"tenant,block,optional"`
Expand Down
6 changes: 6 additions & 0 deletions internal/component/loki/process/stages/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const (
StageTypeSampling = "sampling"
StageTypeStaticLabels = "static_labels"
StageTypeStructuredMetadata = "structured_metadata"
StageTypeStructuredMetadataRegex = "structured_metadata_regex"
StageTypeTemplate = "template"
StageTypeTenant = "tenant"
StageTypeTimestamp = "timestamp"
Expand Down Expand Up @@ -158,6 +159,11 @@ func New(logger log.Logger, jobName *string, cfg StageConfig, registerer prometh
if err != nil {
return nil, err
}
case cfg.StructuredMetadataRegex != nil:
s, err = newStructuredMetadataRegexStage(logger, *cfg.StructuredMetadataRegex)
if err != nil {
return nil, err
}
case cfg.RegexConfig != nil:
s, err = newRegexStage(logger, *cfg.RegexConfig)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package stages

import (
"github.com/go-kit/log"
"regexp"

"github.com/grafana/loki/v3/pkg/logproto"
)

type StructuredMetadataRegexConfig struct {
Regex string `alloy:"regex,attr"`
}

func newStructuredMetadataRegexStage(logger log.Logger, configs StructuredMetadataRegexConfig) (Stage, error) {

re, error := regexp.Compile(configs.Regex)

if error != nil {
return &structuredMetadataRegexStage{}, error
}

return &structuredMetadataRegexStage{
logger: logger,
regex: *re,
}, nil
}

type structuredMetadataRegexStage struct {
logger log.Logger
regex regexp.Regexp
}

func (s *structuredMetadataRegexStage) Name() string {
return StageTypeStructuredMetadataRegex
}

// Cleanup implements Stage.
func (*structuredMetadataRegexStage) Cleanup() {
// no-op
}

func (s *structuredMetadataRegexStage) Run(in chan Entry) chan Entry {
return RunWith(in, func(e Entry) Entry {
for labelName, labelValue := range e.Labels {
if s.regex.MatchString(string(labelName)) {
e.StructuredMetadata = append(e.StructuredMetadata, logproto.LabelAdapter{Name: string(labelName), Value: string(labelValue)})
delete(e.Labels, labelName)
}
}
return e
})
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package stages

import (
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/push"
util_log "github.com/grafana/loki/v3/pkg/util/log"
)

var pipelineStagesStructuredMetadataRegexFromStaticLabels = `
stage.static_labels {
values = {"component" = "querier", "pod" = "loki-querier-664f97db8d-qhnwg"}
}
stage.structured_metadata_regex {
regex = "comp.*"
}
`

func Test_structuredMetadataRegexStage(t *testing.T) {
tests := map[string]struct {
pipelineStagesYaml string
logLine string
expectedStructuredMetadata push.LabelsAdapter
expectedLabels model.LabelSet
}{
"expected ": {
pipelineStagesYaml: pipelineStagesStructuredMetadataRegexFromStaticLabels,
logLine: "",
expectedStructuredMetadata: push.LabelsAdapter{push.LabelAdapter{Name: "component", Value: "querier"}},
expectedLabels: model.LabelSet{model.LabelName("pod"): model.LabelValue("loki-querier-664f97db8d-qhnwg")},
},
}
for name, test := range tests {
t.Run(name, func(t *testing.T) {
pl, err := NewPipeline(util_log.Logger, loadConfig(test.pipelineStagesYaml), nil, prometheus.DefaultRegisterer)
require.NoError(t, err)

result := processEntries(pl, newEntry(nil, nil, test.logLine, time.Now()))[0]
require.Equal(t, test.expectedStructuredMetadata, result.StructuredMetadata)
if test.expectedLabels != nil {
require.Equal(t, test.expectedLabels, result.Labels)
} else {
require.Empty(t, result.Labels)
}
})
}
}

0 comments on commit 61af452

Please sign in to comment.