diff --git a/internal/component/loki/process/stages/pipeline.go b/internal/component/loki/process/stages/pipeline.go index e6583b4514..fbdac39bb1 100644 --- a/internal/component/loki/process/stages/pipeline.go +++ b/internal/component/loki/process/stages/pipeline.go @@ -38,6 +38,7 @@ type StageConfig struct { ReplaceConfig *ReplaceConfig `alloy:"replace,block,optional"` StaticLabelsConfig *StaticLabelsConfig `alloy:"static_labels,block,optional"` StructuredMetadata *LabelsConfig `alloy:"structured_metadata,block,optional"` + StructuredMetadataRegex *StructuredMetadataRegexConfig `alloy:"structured_metadata_regex,block,optional"` SamplingConfig *SamplingConfig `alloy:"sampling,block,optional"` TemplateConfig *TemplateConfig `alloy:"template,block,optional"` TenantConfig *TenantConfig `alloy:"tenant,block,optional"` diff --git a/internal/component/loki/process/stages/stage.go b/internal/component/loki/process/stages/stage.go index 3787daabac..0f7de5628b 100644 --- a/internal/component/loki/process/stages/stage.go +++ b/internal/component/loki/process/stages/stage.go @@ -40,6 +40,7 @@ const ( StageTypeSampling = "sampling" StageTypeStaticLabels = "static_labels" StageTypeStructuredMetadata = "structured_metadata" + StageTypeStructuredMetadataRegex = "structured_metadata_regex" StageTypeTemplate = "template" StageTypeTenant = "tenant" StageTypeTimestamp = "timestamp" @@ -158,6 +159,11 @@ func New(logger log.Logger, jobName *string, cfg StageConfig, registerer prometh if err != nil { return nil, err } + case cfg.StructuredMetadataRegex != nil: + s, err = newStructuredMetadataRegexStage(logger, *cfg.StructuredMetadataRegex) + if err != nil { + return nil, err + } case cfg.RegexConfig != nil: s, err = newRegexStage(logger, *cfg.RegexConfig) if err != nil { diff --git a/internal/component/loki/process/stages/structured_metadata_regex.go b/internal/component/loki/process/stages/structured_metadata_regex.go new file mode 100644 index 0000000000..b30279a3e8 --- /dev/null +++ b/internal/component/loki/process/stages/structured_metadata_regex.go @@ -0,0 +1,52 @@ +package stages + +import ( + "github.com/go-kit/log" + "regexp" + + "github.com/grafana/loki/v3/pkg/logproto" +) + +type StructuredMetadataRegexConfig struct { + Regex string `alloy:"regex,attr"` +} + +func newStructuredMetadataRegexStage(logger log.Logger, configs StructuredMetadataRegexConfig) (Stage, error) { + + re, error := regexp.Compile(configs.Regex) + + if error != nil { + return &structuredMetadataRegexStage{}, error + } + + return &structuredMetadataRegexStage{ + logger: logger, + regex: *re, + }, nil +} + +type structuredMetadataRegexStage struct { + logger log.Logger + regex regexp.Regexp +} + +func (s *structuredMetadataRegexStage) Name() string { + return StageTypeStructuredMetadataRegex +} + +// Cleanup implements Stage. +func (*structuredMetadataRegexStage) Cleanup() { + // no-op +} + +func (s *structuredMetadataRegexStage) Run(in chan Entry) chan Entry { + return RunWith(in, func(e Entry) Entry { + for labelName, labelValue := range e.Labels { + if s.regex.MatchString(string(labelName)) { + e.StructuredMetadata = append(e.StructuredMetadata, logproto.LabelAdapter{Name: string(labelName), Value: string(labelValue)}) + delete(e.Labels, labelName) + } + } + return e + }) +} diff --git a/internal/component/loki/process/stages/structured_metadata_regex_test.go b/internal/component/loki/process/stages/structured_metadata_regex_test.go new file mode 100644 index 0000000000..3ae6baab90 --- /dev/null +++ b/internal/component/loki/process/stages/structured_metadata_regex_test.go @@ -0,0 +1,52 @@ +package stages + +import ( + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/pkg/push" + util_log "github.com/grafana/loki/v3/pkg/util/log" +) + +var pipelineStagesStructuredMetadataRegexFromStaticLabels = ` +stage.static_labels { + values = {"component" = "querier", "pod" = "loki-querier-664f97db8d-qhnwg"} +} +stage.structured_metadata_regex { + regex = "comp.*" +} +` + +func Test_structuredMetadataRegexStage(t *testing.T) { + tests := map[string]struct { + pipelineStagesYaml string + logLine string + expectedStructuredMetadata push.LabelsAdapter + expectedLabels model.LabelSet + }{ + "expected ": { + pipelineStagesYaml: pipelineStagesStructuredMetadataRegexFromStaticLabels, + logLine: "", + expectedStructuredMetadata: push.LabelsAdapter{push.LabelAdapter{Name: "component", Value: "querier"}}, + expectedLabels: model.LabelSet{model.LabelName("pod"): model.LabelValue("loki-querier-664f97db8d-qhnwg")}, + }, + } + for name, test := range tests { + t.Run(name, func(t *testing.T) { + pl, err := NewPipeline(util_log.Logger, loadConfig(test.pipelineStagesYaml), nil, prometheus.DefaultRegisterer) + require.NoError(t, err) + + result := processEntries(pl, newEntry(nil, nil, test.logLine, time.Now()))[0] + require.Equal(t, test.expectedStructuredMetadata, result.StructuredMetadata) + if test.expectedLabels != nil { + require.Equal(t, test.expectedLabels, result.Labels) + } else { + require.Empty(t, result.Labels) + } + }) + } +}