Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New loki.process stage structured_metadata_regex, to move all labels matching a regex to structured metadata #2578

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions internal/component/loki/process/stages/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type StageConfig struct {
ReplaceConfig *ReplaceConfig `alloy:"replace,block,optional"`
StaticLabelsConfig *StaticLabelsConfig `alloy:"static_labels,block,optional"`
StructuredMetadata *LabelsConfig `alloy:"structured_metadata,block,optional"`
StructuredMetadataRegex *StructuredMetadataRegexConfig `alloy:"structured_metadata_regex,block,optional"`
SamplingConfig *SamplingConfig `alloy:"sampling,block,optional"`
TemplateConfig *TemplateConfig `alloy:"template,block,optional"`
TenantConfig *TenantConfig `alloy:"tenant,block,optional"`
Expand Down
6 changes: 6 additions & 0 deletions internal/component/loki/process/stages/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const (
StageTypeSampling = "sampling"
StageTypeStaticLabels = "static_labels"
StageTypeStructuredMetadata = "structured_metadata"
StageTypeStructuredMetadataRegex = "structured_metadata_regex"
StageTypeTemplate = "template"
StageTypeTenant = "tenant"
StageTypeTimestamp = "timestamp"
Expand Down Expand Up @@ -158,6 +159,11 @@ func New(logger log.Logger, jobName *string, cfg StageConfig, registerer prometh
if err != nil {
return nil, err
}
case cfg.StructuredMetadataRegex != nil:
s, err = newStructuredMetadataRegexStage(logger, *cfg.StructuredMetadataRegex)
if err != nil {
return nil, err
}
case cfg.RegexConfig != nil:
s, err = newRegexStage(logger, *cfg.RegexConfig)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package stages

import (
"github.com/go-kit/log"
"regexp"

"github.com/grafana/loki/v3/pkg/logproto"
)

type StructuredMetadataRegexConfig struct {
Regex string `alloy:"regex,attr"`
}

func newStructuredMetadataRegexStage(logger log.Logger, configs StructuredMetadataRegexConfig) (Stage, error) {

re, error := regexp.Compile(configs.Regex)

if error != nil {
return &structuredMetadataRegexStage{}, error
}

return &structuredMetadataRegexStage{
logger: logger,
regex: *re,
}, nil
}

type structuredMetadataRegexStage struct {
logger log.Logger
regex regexp.Regexp
}

func (s *structuredMetadataRegexStage) Name() string {
return StageTypeStructuredMetadataRegex
}

// Cleanup implements Stage.
func (*structuredMetadataRegexStage) Cleanup() {
// no-op
}

func (s *structuredMetadataRegexStage) Run(in chan Entry) chan Entry {
return RunWith(in, func(e Entry) Entry {
for labelName, labelValue := range e.Labels {
if s.regex.MatchString(string(labelName)) {
e.StructuredMetadata = append(e.StructuredMetadata, logproto.LabelAdapter{Name: string(labelName), Value: string(labelValue)})
delete(e.Labels, labelName)
}
}
return e
})
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package stages

import (
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/push"
util_log "github.com/grafana/loki/v3/pkg/util/log"
)

var pipelineStagesStructuredMetadataRegexFromStaticLabels = `
stage.static_labels {
values = {"component" = "querier", "pod" = "loki-querier-664f97db8d-qhnwg"}
}
stage.structured_metadata_regex {
regex = "comp.*"
}
`

func Test_structuredMetadataRegexStage(t *testing.T) {
tests := map[string]struct {
pipelineStagesYaml string
logLine string
expectedStructuredMetadata push.LabelsAdapter
expectedLabels model.LabelSet
}{
"expected ": {
pipelineStagesYaml: pipelineStagesStructuredMetadataRegexFromStaticLabels,
logLine: "",
expectedStructuredMetadata: push.LabelsAdapter{push.LabelAdapter{Name: "component", Value: "querier"}},
expectedLabels: model.LabelSet{model.LabelName("pod"): model.LabelValue("loki-querier-664f97db8d-qhnwg")},
},
}
for name, test := range tests {
t.Run(name, func(t *testing.T) {
pl, err := NewPipeline(util_log.Logger, loadConfig(test.pipelineStagesYaml), nil, prometheus.DefaultRegisterer)
require.NoError(t, err)

result := processEntries(pl, newEntry(nil, nil, test.logLine, time.Now()))[0]
require.Equal(t, test.expectedStructuredMetadata, result.StructuredMetadata)
if test.expectedLabels != nil {
require.Equal(t, test.expectedLabels, result.Labels)
} else {
require.Empty(t, result.Labels)
}
})
}
}