diff --git a/CHANGELOG.md b/CHANGELOG.md index 0f5d830895..6d97f51da2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,10 +12,12 @@ Main (unreleased) ### Features -- Add the possibility to export span events as logs in `otelcol.connector.spanlogs`. (@steve-hb) +- (_Experimental_) Add a `stage.windowsevent` block in the `loki.process` component. This aims to replace the existing `stage.eventlogmessage`. (@wildum) ### Enhancements +- Add the possibility to export span events as logs in `otelcol.connector.spanlogs`. (@steve-hb) + - (_Experimental_) Log instance label key in `database_observability.mysql` (@cristiangreco) - (_Experimental_) Improve parsing of truncated queries in `database_observability.mysql` (@cristiangreco) diff --git a/docs/sources/reference/components/loki/loki.process.md b/docs/sources/reference/components/loki/loki.process.md index 31043c12b9..4041b512ee 100644 --- a/docs/sources/reference/components/loki/loki.process.md +++ b/docs/sources/reference/components/loki/loki.process.md @@ -69,6 +69,7 @@ The following blocks are supported inside the definition of `loki.process`: | stage.template | [stage.template][] | Configures a `template` processing stage. | no | | stage.tenant | [stage.tenant][] | Configures a `tenant` processing stage. | no | | stage.timestamp | [stage.timestamp][] | Configures a `timestamp` processing stage. | no | +| stage.windowsevent | [stage.windowsevent][] | Configures a `windowsevent` processing stage. | no | A user can provide any number of these stage blocks nested inside `loki.process`; these will run in order of appearance in the configuration file. @@ -98,6 +99,7 @@ A user can provide any number of these stage blocks nested inside `loki.process` [stage.template]: #stagetemplate-block [stage.tenant]: #stagetenant-block [stage.timestamp]: #stagetimestamp-block +[stage.windowsevent]: #stagewindowsevent-block ### stage.cri block @@ -242,6 +244,8 @@ stage.drop { ### stage.eventlogmessage block +Deprecated in favor of the [stage.windowsevent block][stage.windowsevent]. + The `eventlogmessage` stage extracts data from the Message string that appears in the Windows Event Log. The following arguments are supported: @@ -1694,6 +1698,92 @@ loki.process "example" { The `json` stage extracts the IP address from the `client_ip` key in the log line. Then the extracted `ip` value is given as source to geoip stage. The geoip stage performs a lookup on the IP and populates the shared map with the data from the city database results in addition to the custom lookups. Lastly, the custom lookup fields from the shared map are added as labels. +### stage.windowsevent block + +The `windowsevent` stage extracts data from the message string in the Windows Event Log. + +The following arguments are supported: + +| Name | Type | Description | Default | Required | +|-----------------------|----------|--------------------------------------------------------|-----------|----------| +| `source` | `string` | Name of the field in the extracted data to parse. | `message` | no | +| `overwrite_existing` | `bool` | Whether to overwrite existing extracted data fields. | `false` | no | +| `drop_invalid_labels` | `bool` | Whether to drop fields that are not valid label names. | `false` | no | + +When `overwrite_existing` is set to `true`, the stage overwrites existing extracted data fields with the same name. +If set to `false`, the `_extracted` suffix is appended to an existing field name. + +When `drop_invalid_labels` is set to `true`, the stage drops fields that aren't valid label names. +If set to `false`, the stage will automatically convert them into valid labels, replacing invalid characters with underscores. + +The `windowsevent` stage expects the message to be structured in sections that are split by empty lines. + +The first section of the input is treated as a whole block and stored in the extracted map with the key `Description`. + +Sections following the Description are expected to contain key-value pairs in the format key:value. + +If the first line of a section has no value, for example "Subject:", the key will act as a prefix for subsequent keys in the same section. + +If a line within a section does not include the `:` symbol, it is considered part of the previous entry's value. The line is appended to the previous value, separated by a comma. + +Lines in a section without a preceding valid entry (key-value pair) are ignored and discarded. + +#### Example with `loki.source.windowsevent` + +```alloy +loki.source.windowsevent "security" { + eventlog_name = "Security" + forward_to = [loki.process.default.receiver] +} + +loki.process "default" { + forward_to = [loki.write.default.receiver] + + stage.json { + expressions = { + message = "", + Overwritten = "", + } + } + + stage.windowsevent { + source = "message" + overwrite_existing = true + } + + stage.labels { + values = { + Description = "", + Subject_SecurityID = "", + ReadOP = "Subject_ReadOperation", + } + } +} +``` + +The `loki.source.windowsevent` component forwards Windows security events to the `loki.process` component. + +Given the following event: +``` +{"event_id": 1, "Overwritten": "old", "message": ""Special privileges assigned to new logon.\r\n\r\nSubject:\r\n\tSecurity ID:\t\tS-1-1-1\r\n\tAccount Name:\t\tSYSTEM\r\n\tAccount Domain:\t\tNT AUTHORITY\r\n\tLogon ID:\t\t0xAAA\r\n\r\nPrivileges:\t\tSeAssignPrimaryTokenPrivilege\r\n\t\t\tSeTcbPrivilege\r\n\t\t\tSeSecurityPrivilege\r\n\t\t\tSeTakeOwnershipPrivilege\r\n\t\t\tSeLoadDriverPrivilege\r\n\t\t\tSeBackupPrivilege\r\n\t\t\tSeRestorePrivilege\r\n\t\t\tSeDebugPrivilege\r\n\t\t\tSeAuditPrivilege\r\n\t\t\tSeSystemEnvironmentPrivilege\r\n\t\t\tSeImpersonatePrivilege\r\n\t\t\tSeDelegateSessionUserImpersonatePrivilege""} +``` + +The `json` stage would create the following key-value pairs in the set of extracted data: + +- `message`: `"Special privileges assigned to new logon.\r\n\r\nSubject:\r\n\tSecurity ID:\t\tS-1-1-1\r\n\tAccount Name:\t\tSYSTEM\r\n\tAccount Domain:\t\tNT AUTHORITY\r\n\tLogon ID:\t\t0xAAA\r\n\r\nPrivileges:\t\tSeAssignPrimaryTokenPrivilege\r\n\t\t\tSeTcbPrivilege\r\n\t\t\tSeSecurityPrivilege"` +- `Overwritten`: `old` + +The `windowsevent` stage will parse the value of `message` from the extracted data and append/overwrite the following key-value pairs to the set of extracted data: + +- `Description`: "Special privileges assigned to new logon.", +- `Subject_SecurityID`: "S-1-1-1", +- `Subject_AccountName`: "SYSTEM", +- `Subject_AccountDomain`: "NT AUTHORITY", +- `Subject_LogonID`: "0xAAA", +- `Privileges`: "SeAssignPrimaryTokenPrivilege,SeTcbPrivilege,SeSecurityPrivilege", + +Finally the `labels` stage will use the extracted values `Description`, `Subject_SecurityID` and `Subject_ReadOperation` to add them as labels of the log entry before forwarding it to a `loki.write` component. + ## Exported fields The following fields are exported and can be referenced by other components: diff --git a/internal/component/loki/process/process.go b/internal/component/loki/process/process.go index f701652273..ae858232f3 100644 --- a/internal/component/loki/process/process.go +++ b/internal/component/loki/process/process.go @@ -141,7 +141,7 @@ func (c *Component) Update(args component.Arguments) error { c.entryHandler.Stop() } - pipeline, err := stages.NewPipeline(c.opts.Logger, newArgs.Stages, &c.opts.ID, c.opts.Registerer) + pipeline, err := stages.NewPipeline(c.opts.Logger, newArgs.Stages, &c.opts.ID, c.opts.Registerer, c.opts.MinStability) if err != nil { return err } diff --git a/internal/component/loki/process/stages/decolorize_test.go b/internal/component/loki/process/stages/decolorize_test.go index f1642e599a..fb0718ce61 100644 --- a/internal/component/loki/process/stages/decolorize_test.go +++ b/internal/component/loki/process/stages/decolorize_test.go @@ -7,6 +7,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" + "github.com/grafana/alloy/internal/featuregate" util_log "github.com/grafana/loki/v3/pkg/util/log" ) @@ -40,7 +41,7 @@ func TestPipeline_Decolorize(t *testing.T) { t.Run(testName, func(t *testing.T) { t.Parallel() - pl, err := NewPipeline(util_log.Logger, loadConfig(testData.config), nil, prometheus.DefaultRegisterer) + pl, err := NewPipeline(util_log.Logger, loadConfig(testData.config), nil, prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable) if err != nil { t.Fatal(err) } diff --git a/internal/component/loki/process/stages/drop_test.go b/internal/component/loki/process/stages/drop_test.go index e77293f45d..376b3fdcfd 100644 --- a/internal/component/loki/process/stages/drop_test.go +++ b/internal/component/loki/process/stages/drop_test.go @@ -13,6 +13,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/grafana/alloy/internal/featuregate" "github.com/grafana/alloy/internal/util" ) @@ -433,7 +434,7 @@ func TestDropPipeline(t *testing.T) { registry := prometheus.NewRegistry() plName := "test_drop_pipeline" logger := util.TestAlloyLogger(t) - pl, err := NewPipeline(logger, loadConfig(testDropAlloy), &plName, registry) + pl, err := NewPipeline(logger, loadConfig(testDropAlloy), &plName, registry, featuregate.StabilityGenerallyAvailable) require.NoError(t, err) out := processEntries(pl, newEntry(nil, nil, testMatchLogLineApp1, time.Now()), diff --git a/internal/component/loki/process/stages/eventlogmessage_test.go b/internal/component/loki/process/stages/eventlogmessage_test.go index 0ed8eb802b..45b4d6ddfe 100644 --- a/internal/component/loki/process/stages/eventlogmessage_test.go +++ b/internal/component/loki/process/stages/eventlogmessage_test.go @@ -13,6 +13,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/grafana/alloy/internal/featuregate" "github.com/grafana/alloy/syntax" ) @@ -107,7 +108,7 @@ func TestEventLogMessage_simple(t *testing.T) { t.Run(testName, func(t *testing.T) { t.Parallel() - pl, err := NewPipeline(util_log.Logger, loadConfig(testData.config), nil, prometheus.DefaultRegisterer) + pl, err := NewPipeline(util_log.Logger, loadConfig(testData.config), nil, prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable) assert.NoError(t, err, "Expected pipeline creation to not result in error") out := processEntries(pl, newEntry(map[string]interface{}{ @@ -267,7 +268,7 @@ func TestEventLogMessage_Real(t *testing.T) { t.Run(testName, func(t *testing.T) { t.Parallel() - pl, err := NewPipeline(util_log.Logger, loadConfig(testData.config), nil, prometheus.DefaultRegisterer) + pl, err := NewPipeline(util_log.Logger, loadConfig(testData.config), nil, prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable) assert.NoError(t, err, "Expected pipeline creation to not result in error") out := processEntries(pl, newEntry(map[string]interface{}{testData.sourcekey: testData.msgdata}, nil, testData.msgdata, time.Now()))[0] @@ -323,7 +324,7 @@ func TestEventLogMessage_invalid(t *testing.T) { t.Run(testName, func(t *testing.T) { t.Parallel() - pl, err := NewPipeline(util_log.Logger, loadConfig(testData.config), nil, prometheus.DefaultRegisterer) + pl, err := NewPipeline(util_log.Logger, loadConfig(testData.config), nil, prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable) assert.NoError(t, err, "Expected pipeline creation to not result in error") out := processEntries(pl, newEntry(map[string]interface{}{testData.sourcekey: testData.msgdata}, nil, testData.msgdata, time.Now()))[0] @@ -335,7 +336,7 @@ func TestEventLogMessage_invalid(t *testing.T) { func TestEventLogMessage_invalidString(t *testing.T) { t.Parallel() - pl, err := NewPipeline(util_log.Logger, loadConfig(testEvtLogMsgYamlDefaults), nil, prometheus.DefaultRegisterer) + pl, err := NewPipeline(util_log.Logger, loadConfig(testEvtLogMsgYamlDefaults), nil, prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable) assert.NoError(t, err, "Expected pipeline creation to not result in error") out := processEntries(pl, newEntry(map[string]interface{}{"message": nil}, nil, "", time.Now())) diff --git a/internal/component/loki/process/stages/extensions.go b/internal/component/loki/process/stages/extensions.go index d01662acb9..3c7a4262bb 100644 --- a/internal/component/loki/process/stages/extensions.go +++ b/internal/component/loki/process/stages/extensions.go @@ -5,6 +5,7 @@ import ( "strings" "github.com/go-kit/log" + "github.com/grafana/alloy/internal/featuregate" "github.com/grafana/alloy/internal/runtime/logging/level" "github.com/grafana/alloy/syntax" "github.com/prometheus/client_golang/prometheus" @@ -55,7 +56,7 @@ func (args *CRIConfig) Validate() error { // NewDocker creates a predefined pipeline for parsing entries in the Docker // json log format. -func NewDocker(logger log.Logger, registerer prometheus.Registerer) (Stage, error) { +func NewDocker(logger log.Logger, registerer prometheus.Registerer, minStability featuregate.Stability) (Stage, error) { stages := []StageConfig{ { JSONConfig: &JSONConfig{ @@ -83,7 +84,7 @@ func NewDocker(logger log.Logger, registerer prometheus.Registerer) (Stage, erro }, }, } - return NewPipeline(logger, stages, nil, registerer) + return NewPipeline(logger, stages, nil, registerer, minStability) } type cri struct { @@ -169,7 +170,7 @@ func (c *cri) ensureTruncateIfRequired(e *Entry) { // NewCRI creates a predefined pipeline for parsing entries in the CRI log // format. -func NewCRI(logger log.Logger, config CRIConfig, registerer prometheus.Registerer) (Stage, error) { +func NewCRI(logger log.Logger, config CRIConfig, registerer prometheus.Registerer, minStability featuregate.Stability) (Stage, error) { base := []StageConfig{ { RegexConfig: &RegexConfig{ @@ -199,7 +200,7 @@ func NewCRI(logger log.Logger, config CRIConfig, registerer prometheus.Registere }, } - p, err := NewPipeline(logger, base, nil, registerer) + p, err := NewPipeline(logger, base, nil, registerer, minStability) if err != nil { return nil, err } diff --git a/internal/component/loki/process/stages/extensions_test.go b/internal/component/loki/process/stages/extensions_test.go index c0793a7f54..fba4352ebf 100644 --- a/internal/component/loki/process/stages/extensions_test.go +++ b/internal/component/loki/process/stages/extensions_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/grafana/alloy/internal/featuregate" util_log "github.com/grafana/loki/v3/pkg/util/log" ) @@ -68,7 +69,7 @@ func TestNewDocker(t *testing.T) { tt := tt t.Run(tName, func(t *testing.T) { t.Parallel() - p, err := NewDocker(util_log.Logger, prometheus.DefaultRegisterer) + p, err := NewDocker(util_log.Logger, prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable) if err != nil { t.Fatalf("failed to create Docker parser: %s", err) } @@ -192,7 +193,7 @@ func TestCRI_tags(t *testing.T) { MaxPartialLineSize: tt.maxPartialLineSize, MaxPartialLineSizeTruncate: tt.maxPartialLineSizeTruncate, } - p, err := NewCRI(util_log.Logger, cfg, prometheus.DefaultRegisterer) + p, err := NewCRI(util_log.Logger, cfg, prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable) require.NoError(t, err) got := make([]string, 0) @@ -275,7 +276,7 @@ func TestNewCri(t *testing.T) { t.Run(tName, func(t *testing.T) { t.Parallel() cfg := DefaultCRIConfig - p, err := NewCRI(util_log.Logger, cfg, prometheus.DefaultRegisterer) + p, err := NewCRI(util_log.Logger, cfg, prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable) if err != nil { t.Fatalf("failed to create CRI parser: %s", err) } diff --git a/internal/component/loki/process/stages/json_test.go b/internal/component/loki/process/stages/json_test.go index 79bb01c2ee..b8063ee536 100644 --- a/internal/component/loki/process/stages/json_test.go +++ b/internal/component/loki/process/stages/json_test.go @@ -10,6 +10,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" + "github.com/grafana/alloy/internal/featuregate" "github.com/grafana/alloy/internal/util" "github.com/grafana/alloy/syntax" ) @@ -79,7 +80,7 @@ func TestPipeline_JSON(t *testing.T) { t.Run(testName, func(t *testing.T) { t.Parallel() - pl, err := NewPipeline(logger, loadConfig(testData.config), nil, prometheus.DefaultRegisterer) + pl, err := NewPipeline(logger, loadConfig(testData.config), nil, prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable) assert.NoError(t, err, "Expected pipeline creation to not result in error") out := processEntries(pl, newEntry(nil, nil, testData.entry, time.Now()))[0] assert.Equal(t, testData.expectedExtract, out.Extracted) @@ -341,7 +342,7 @@ func TestJSONParser_Parse(t *testing.T) { tt := tt t.Run(tName, func(t *testing.T) { t.Parallel() - p, err := New(logger, nil, tt.config, nil) + p, err := New(logger, nil, tt.config, nil, featuregate.StabilityGenerallyAvailable) assert.NoError(t, err, "failed to create json parser: %s", err) out := processEntries(p, newEntry(tt.extracted, nil, tt.entry, time.Now()))[0] diff --git a/internal/component/loki/process/stages/labels_test.go b/internal/component/loki/process/stages/labels_test.go index 8cb0ac7cf1..a32b2ab288 100644 --- a/internal/component/loki/process/stages/labels_test.go +++ b/internal/component/loki/process/stages/labels_test.go @@ -13,6 +13,7 @@ import ( "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" + "github.com/grafana/alloy/internal/featuregate" util_log "github.com/grafana/loki/v3/pkg/util/log" ) @@ -40,7 +41,7 @@ var testLabelsLogLineWithMissingKey = ` ` func TestLabelsPipeline_Labels(t *testing.T) { - pl, err := NewPipeline(util_log.Logger, loadConfig(testLabelsYaml), nil, prometheus.DefaultRegisterer) + pl, err := NewPipeline(util_log.Logger, loadConfig(testLabelsYaml), nil, prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable) if err != nil { t.Fatal(err) } @@ -57,7 +58,7 @@ func TestLabelsPipelineWithMissingKey_Labels(t *testing.T) { var buf bytes.Buffer w := log.NewSyncWriter(&buf) logger := log.NewLogfmtLogger(w) - pl, err := NewPipeline(logger, loadConfig(testLabelsYaml), nil, prometheus.DefaultRegisterer) + pl, err := NewPipeline(logger, loadConfig(testLabelsYaml), nil, prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable) if err != nil { t.Fatal(err) } diff --git a/internal/component/loki/process/stages/limit_test.go b/internal/component/loki/process/stages/limit_test.go index 507d0eb42b..521a655b96 100644 --- a/internal/component/loki/process/stages/limit_test.go +++ b/internal/component/loki/process/stages/limit_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/grafana/alloy/internal/featuregate" util_log "github.com/grafana/loki/v3/pkg/util/log" ) @@ -58,7 +59,7 @@ var plName = "testPipeline" // TestLimitPipeline is used to verify we properly parse the yaml config and create a working pipeline func TestLimitWaitPipeline(t *testing.T) { registry := prometheus.NewRegistry() - pl, err := NewPipeline(util_log.Logger, loadConfig(testLimitWaitAlloy), &plName, registry) + pl, err := NewPipeline(util_log.Logger, loadConfig(testLimitWaitAlloy), &plName, registry, featuregate.StabilityGenerallyAvailable) logs := make([]Entry, 0) logCount := 5 for i := 0; i < logCount; i++ { @@ -76,7 +77,7 @@ func TestLimitWaitPipeline(t *testing.T) { // TestLimitPipeline is used to verify we properly parse the yaml config and create a working pipeline func TestLimitDropPipeline(t *testing.T) { registry := prometheus.NewRegistry() - pl, err := NewPipeline(util_log.Logger, loadConfig(testLimitDropAlloy), &plName, registry) + pl, err := NewPipeline(util_log.Logger, loadConfig(testLimitDropAlloy), &plName, registry, featuregate.StabilityGenerallyAvailable) logs := make([]Entry, 0) logCount := 10 for i := 0; i < logCount; i++ { @@ -94,7 +95,7 @@ func TestLimitDropPipeline(t *testing.T) { // TestLimitByLabelPipeline is used to verify we properly parse the yaml config and create a working pipeline func TestLimitByLabelPipeline(t *testing.T) { registry := prometheus.NewRegistry() - pl, err := NewPipeline(util_log.Logger, loadConfig(testLimitByLabelAlloy), &plName, registry) + pl, err := NewPipeline(util_log.Logger, loadConfig(testLimitByLabelAlloy), &plName, registry, featuregate.StabilityGenerallyAvailable) logs := make([]Entry, 0) logCount := 5 for i := 0; i < logCount; i++ { diff --git a/internal/component/loki/process/stages/logfmt_test.go b/internal/component/loki/process/stages/logfmt_test.go index d7f3ca3d84..256e399d9c 100644 --- a/internal/component/loki/process/stages/logfmt_test.go +++ b/internal/component/loki/process/stages/logfmt_test.go @@ -9,6 +9,7 @@ import ( util_log "github.com/grafana/loki/v3/pkg/util/log" + "github.com/grafana/alloy/internal/featuregate" "github.com/grafana/alloy/internal/util" ) @@ -62,7 +63,7 @@ func TestLogfmt(t *testing.T) { t.Run(testName, func(t *testing.T) { t.Parallel() - pl, err := NewPipeline(util_log.Logger, loadConfig(testData.config), nil, prometheus.DefaultRegisterer) + pl, err := NewPipeline(util_log.Logger, loadConfig(testData.config), nil, prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable) assert.NoError(t, err) out := processEntries(pl, newEntry(nil, nil, testData.entry, time.Now()))[0] assert.Equal(t, testData.expectedExtract, out.Extracted) @@ -236,7 +237,7 @@ func TestLogfmtParser_Parse(t *testing.T) { tt := tt t.Run(tName, func(t *testing.T) { t.Parallel() - p, err := New(logger, nil, StageConfig{LogfmtConfig: &tt.config}, nil) + p, err := New(logger, nil, StageConfig{LogfmtConfig: &tt.config}, nil, featuregate.StabilityGenerallyAvailable) assert.NoError(t, err) out := processEntries(p, newEntry(tt.extracted, nil, tt.entry, time.Now()))[0] diff --git a/internal/component/loki/process/stages/match.go b/internal/component/loki/process/stages/match.go index 8cb71e2e27..5dbab136b3 100644 --- a/internal/component/loki/process/stages/match.go +++ b/internal/component/loki/process/stages/match.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/go-kit/log" + "github.com/grafana/alloy/internal/featuregate" "github.com/grafana/loki/v3/clients/pkg/logentry/logql" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" @@ -60,7 +61,7 @@ func validateMatcherConfig(cfg *MatchConfig) (logql.Expr, error) { } // newMatcherStage creates a new matcherStage from config -func newMatcherStage(logger log.Logger, jobName *string, config MatchConfig, registerer prometheus.Registerer) (Stage, error) { +func newMatcherStage(logger log.Logger, jobName *string, config MatchConfig, registerer prometheus.Registerer, minStability featuregate.Stability) (Stage, error) { selector, err := validateMatcherConfig(&config) if err != nil { return nil, err @@ -75,7 +76,7 @@ func newMatcherStage(logger log.Logger, jobName *string, config MatchConfig, reg var pl *Pipeline if config.Action == MatchActionKeep { var err error - pl, err = NewPipeline(logger, config.Stages, nPtr, registerer) + pl, err = NewPipeline(logger, config.Stages, nPtr, registerer, minStability) if err != nil { return nil, fmt.Errorf("%v: %w", err, fmt.Errorf("match stage failed to create pipeline from config: %v", config)) } diff --git a/internal/component/loki/process/stages/match_test.go b/internal/component/loki/process/stages/match_test.go index 3a67c064be..0640c7891f 100644 --- a/internal/component/loki/process/stages/match_test.go +++ b/internal/component/loki/process/stages/match_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/grafana/alloy/internal/featuregate" "github.com/grafana/alloy/internal/util" ) @@ -67,7 +68,7 @@ func TestMatchStage(t *testing.T) { registry := prometheus.NewRegistry() plName := "test_match_pipeline" logger := util.TestAlloyLogger(t) - pl, err := NewPipeline(logger, loadConfig(testMatchAlloy), &plName, registry) + pl, err := NewPipeline(logger, loadConfig(testMatchAlloy), &plName, registry, featuregate.StabilityGenerallyAvailable) if err != nil { t.Fatal(err) } @@ -162,7 +163,7 @@ func TestMatcher(t *testing.T) { "", } logger := util.TestAlloyLogger(t) - s, err := newMatcherStage(logger, nil, matchConfig, prometheus.DefaultRegisterer) + s, err := newMatcherStage(logger, nil, matchConfig, prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable) if (err != nil) != tt.wantErr { t.Errorf("withMatcher() error = %v, wantErr %v", err, tt.wantErr) return diff --git a/internal/component/loki/process/stages/metric_test.go b/internal/component/loki/process/stages/metric_test.go index 47ae5651fe..cd810663a2 100644 --- a/internal/component/loki/process/stages/metric_test.go +++ b/internal/component/loki/process/stages/metric_test.go @@ -14,6 +14,7 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/alloy/internal/component/loki/process/metric" + "github.com/grafana/alloy/internal/featuregate" ) var testMetricAlloy = ` @@ -109,7 +110,7 @@ loki_process_custom_total_lines_count{test="app"} 2 func TestMetricsPipeline(t *testing.T) { registry := prometheus.NewRegistry() - pl, err := NewPipeline(util_log.Logger, loadConfig(testMetricAlloy), nil, registry) + pl, err := NewPipeline(util_log.Logger, loadConfig(testMetricAlloy), nil, registry, featuregate.StabilityGenerallyAvailable) if err != nil { t.Fatal(err) } @@ -147,7 +148,7 @@ stage.metrics { action = "set" } } ` - pl, err := NewPipeline(util_log.Logger, loadConfig(testConfig), nil, registry) + pl, err := NewPipeline(util_log.Logger, loadConfig(testConfig), nil, registry, featuregate.StabilityGenerallyAvailable) if err != nil { t.Fatal(err) } @@ -167,7 +168,7 @@ func TestPipelineWithMissingKey_Metrics(t *testing.T) { var buf bytes.Buffer w := log.NewSyncWriter(&buf) logger := log.NewLogfmtLogger(w) - pl, err := NewPipeline(logger, loadConfig(testMetricAlloy), nil, prometheus.DefaultRegisterer) + pl, err := NewPipeline(logger, loadConfig(testMetricAlloy), nil, prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable) if err != nil { t.Fatal(err) } @@ -206,7 +207,7 @@ loki_process_custom_loki_count 1 func TestMetricsWithDropInPipeline(t *testing.T) { registry := prometheus.NewRegistry() - pl, err := NewPipeline(util_log.Logger, loadConfig(testMetricWithDropAlloy), nil, registry) + pl, err := NewPipeline(util_log.Logger, loadConfig(testMetricWithDropAlloy), nil, registry, featuregate.StabilityGenerallyAvailable) if err != nil { t.Fatal(err) } @@ -348,7 +349,7 @@ loki_process_custom_payload_size_bytes_count{test="app"} 1 } { t.Run(name, func(t *testing.T) { registry := prometheus.NewRegistry() - pl, err := NewPipeline(util_log.Logger, loadConfig(tc.promtailConfig), nil, registry) + pl, err := NewPipeline(util_log.Logger, loadConfig(tc.promtailConfig), nil, registry, featuregate.StabilityGenerallyAvailable) require.NoError(t, err) in := make(chan Entry) out := pl.Run(in) @@ -448,15 +449,15 @@ func TestMetricStage_Process(t *testing.T) { }}} registry := prometheus.NewRegistry() - jsonStage, err := New(util_log.Logger, nil, jsonStageConfig, registry) + jsonStage, err := New(util_log.Logger, nil, jsonStageConfig, registry, featuregate.StabilityGenerallyAvailable) if err != nil { t.Fatalf("failed to create stage with metrics: %v", err) } - regexStage, err := New(util_log.Logger, nil, regexStageConfig, registry) + regexStage, err := New(util_log.Logger, nil, regexStageConfig, registry, featuregate.StabilityGenerallyAvailable) if err != nil { t.Fatalf("failed to create stage with metrics: %v", err) } - metricStage, err := New(util_log.Logger, nil, metricsStageConfig, registry) + metricStage, err := New(util_log.Logger, nil, metricsStageConfig, registry, featuregate.StabilityGenerallyAvailable) if err != nil { t.Fatalf("failed to create stage with metrics: %v", err) } diff --git a/internal/component/loki/process/stages/output_test.go b/internal/component/loki/process/stages/output_test.go index 925fa15ede..ba993b4b91 100644 --- a/internal/component/loki/process/stages/output_test.go +++ b/internal/component/loki/process/stages/output_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/go-kit/log" + "github.com/grafana/alloy/internal/featuregate" "github.com/grafana/alloy/internal/util" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" @@ -44,7 +45,7 @@ var testOutputLogLineWithMissingKey = ` func TestPipeline_Output(t *testing.T) { logger := util.TestAlloyLogger(t) - pl, err := NewPipeline(logger, loadConfig(testOutputAlloy), nil, prometheus.DefaultRegisterer) + pl, err := NewPipeline(logger, loadConfig(testOutputAlloy), nil, prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable) require.NoError(t, err) out := processEntries(pl, newEntry(nil, nil, testOutputLogLine, time.Now()))[0] @@ -55,7 +56,7 @@ func TestPipelineWithMissingKey_Output(t *testing.T) { var buf bytes.Buffer w := log.NewSyncWriter(&buf) logger := log.NewLogfmtLogger(w) - pl, err := NewPipeline(logger, loadConfig(testOutputAlloy), nil, prometheus.DefaultRegisterer) + pl, err := NewPipeline(logger, loadConfig(testOutputAlloy), nil, prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable) require.NoError(t, err) _ = processEntries(pl, newEntry(nil, nil, testOutputLogLineWithMissingKey, time.Now())) diff --git a/internal/component/loki/process/stages/pack_test.go b/internal/component/loki/process/stages/pack_test.go index a1db081001..22b4e44427 100644 --- a/internal/component/loki/process/stages/pack_test.go +++ b/internal/component/loki/process/stages/pack_test.go @@ -13,6 +13,7 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/alloy/internal/component/common/loki" + "github.com/grafana/alloy/internal/featuregate" "github.com/grafana/alloy/internal/util" ) @@ -39,7 +40,7 @@ func TestPackPipeline(t *testing.T) { registry := prometheus.NewRegistry() plName := "test_pack_pipeline" logger := util.TestAlloyLogger(t) - pl, err := NewPipeline(logger, loadConfig(testPackAlloy), &plName, registry) + pl, err := NewPipeline(logger, loadConfig(testPackAlloy), &plName, registry, featuregate.StabilityGenerallyAvailable) require.NoError(t, err) l1Lbls := model.LabelSet{ diff --git a/internal/component/loki/process/stages/pipeline.go b/internal/component/loki/process/stages/pipeline.go index e6583b4514..f4404941c0 100644 --- a/internal/component/loki/process/stages/pipeline.go +++ b/internal/component/loki/process/stages/pipeline.go @@ -10,6 +10,7 @@ import ( "golang.org/x/time/rate" "github.com/grafana/alloy/internal/component/common/loki" + "github.com/grafana/alloy/internal/featuregate" ) // StageConfig defines a single stage in a processing pipeline. @@ -42,6 +43,7 @@ type StageConfig struct { TemplateConfig *TemplateConfig `alloy:"template,block,optional"` TenantConfig *TenantConfig `alloy:"tenant,block,optional"` TimestampConfig *TimestampConfig `alloy:"timestamp,block,optional"` + WindowsEventConfig *WindowsEventConfig `alloy:"windowsevent,block,optional"` } var rateLimiter *rate.Limiter @@ -57,10 +59,10 @@ type Pipeline struct { } // NewPipeline creates a new log entry pipeline from a configuration -func NewPipeline(logger log.Logger, stages []StageConfig, jobName *string, registerer prometheus.Registerer) (*Pipeline, error) { +func NewPipeline(logger log.Logger, stages []StageConfig, jobName *string, registerer prometheus.Registerer, minStability featuregate.Stability) (*Pipeline, error) { st := []Stage{} for _, stage := range stages { - newStage, err := New(logger, jobName, stage, registerer) + newStage, err := New(logger, jobName, stage, registerer, minStability) if err != nil { return nil, fmt.Errorf("invalid stage config %w", err) } diff --git a/internal/component/loki/process/stages/pipeline_test.go b/internal/component/loki/process/stages/pipeline_test.go index 0a8dc08f73..c990da1cc6 100644 --- a/internal/component/loki/process/stages/pipeline_test.go +++ b/internal/component/loki/process/stages/pipeline_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/grafana/alloy/internal/component/common/loki/client/fake" + "github.com/grafana/alloy/internal/featuregate" "github.com/go-kit/log" "github.com/grafana/loki/v3/pkg/logproto" @@ -54,7 +55,7 @@ func loadConfig(yml string) []StageConfig { } func newPipelineFromConfig(cfg, name string) (*Pipeline, error) { - return NewPipeline(util_log.Logger, loadConfig(cfg), &name, prometheus.DefaultRegisterer) + return NewPipeline(util_log.Logger, loadConfig(cfg), &name, prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable) } // TODO(@tpaschalis) Comment these out until we port over the remaining @@ -101,7 +102,7 @@ stage.output { }` func TestNewPipeline(t *testing.T) { - p, err := NewPipeline(util_log.Logger, loadConfig(testMultiStageAlloy), nil, prometheus.DefaultRegisterer) + p, err := NewPipeline(util_log.Logger, loadConfig(testMultiStageAlloy), nil, prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable) if err != nil { panic(err) } @@ -213,7 +214,7 @@ func TestPipeline_Process(t *testing.T) { err := syntax.Unmarshal([]byte(tt.config), &config) require.NoError(t, err) - p, err := NewPipeline(util_log.Logger, loadConfig(tt.config), nil, prometheus.DefaultRegisterer) + p, err := NewPipeline(util_log.Logger, loadConfig(tt.config), nil, prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable) require.NoError(t, err) out := processEntries(p, newEntry(nil, tt.initialLabels, tt.entry, tt.t))[0] @@ -255,7 +256,7 @@ func BenchmarkPipeline(b *testing.B) { } for _, bm := range benchmarks { b.Run(bm.name, func(b *testing.B) { - pl, err := NewPipeline(bm.logger, bm.stgs, nil, prometheus.DefaultRegisterer) + pl, err := NewPipeline(bm.logger, bm.stgs, nil, prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable) if err != nil { panic(err) } @@ -280,7 +281,7 @@ func BenchmarkPipeline(b *testing.B) { func TestPipeline_Wrap(t *testing.T) { now := time.Now() - p, err := NewPipeline(util_log.Logger, loadConfig(testMultiStageAlloy), nil, prometheus.DefaultRegisterer) + p, err := NewPipeline(util_log.Logger, loadConfig(testMultiStageAlloy), nil, prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable) if err != nil { panic(err) } diff --git a/internal/component/loki/process/stages/regex_test.go b/internal/component/loki/process/stages/regex_test.go index ecd4d3486c..6bfca7b9e6 100644 --- a/internal/component/loki/process/stages/regex_test.go +++ b/internal/component/loki/process/stages/regex_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/go-kit/log" + "github.com/grafana/alloy/internal/featuregate" "github.com/grafana/alloy/internal/util" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" @@ -108,7 +109,7 @@ func TestPipeline_Regex(t *testing.T) { t.Parallel() logger := util.TestAlloyLogger(t) - pl, err := NewPipeline(logger, loadConfig(testData.config), nil, prometheus.DefaultRegisterer) + pl, err := NewPipeline(logger, loadConfig(testData.config), nil, prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable) if err != nil { t.Fatal(err) } @@ -123,7 +124,7 @@ func TestPipelineWithMissingKey_Regex(t *testing.T) { var buf bytes.Buffer w := log.NewSyncWriter(&buf) logger := log.NewLogfmtLogger(w) - pl, err := NewPipeline(logger, loadConfig(testRegexAlloySourceWithMissingKey), nil, prometheus.DefaultRegisterer) + pl, err := NewPipeline(logger, loadConfig(testRegexAlloySourceWithMissingKey), nil, prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable) if err != nil { t.Fatal(err) } @@ -299,7 +300,7 @@ func TestRegexParser_Parse(t *testing.T) { t.Run(tName, func(t *testing.T) { t.Parallel() logger := util.TestAlloyLogger(t) - p, err := New(logger, nil, StageConfig{RegexConfig: &tt.config}, nil) + p, err := New(logger, nil, StageConfig{RegexConfig: &tt.config}, nil, featuregate.StabilityGenerallyAvailable) if err != nil { t.Fatalf("failed to create regex parser: %s", err) } @@ -324,7 +325,7 @@ func BenchmarkRegexStage(b *testing.B) { for _, bm := range benchmarks { b.Run(bm.name, func(b *testing.B) { logger := util.TestAlloyLogger(b) - stage, err := New(logger, nil, StageConfig{RegexConfig: &bm.config}, nil) + stage, err := New(logger, nil, StageConfig{RegexConfig: &bm.config}, nil, featuregate.StabilityGenerallyAvailable) if err != nil { panic(err) } diff --git a/internal/component/loki/process/stages/replace_test.go b/internal/component/loki/process/stages/replace_test.go index 021961180f..2bdbaab49c 100644 --- a/internal/component/loki/process/stages/replace_test.go +++ b/internal/component/loki/process/stages/replace_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/grafana/alloy/internal/featuregate" "github.com/grafana/alloy/internal/util" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" @@ -155,7 +156,7 @@ func TestReplace(t *testing.T) { t.Run(testName, func(t *testing.T) { t.Parallel() - pl, err := NewPipeline(logger, loadConfig(testData.config), nil, prometheus.DefaultRegisterer) + pl, err := NewPipeline(logger, loadConfig(testData.config), nil, prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable) if err != nil { t.Fatal(err) } diff --git a/internal/component/loki/process/stages/sampling_test.go b/internal/component/loki/process/stages/sampling_test.go index 11502d8184..c78dc1d84d 100644 --- a/internal/component/loki/process/stages/sampling_test.go +++ b/internal/component/loki/process/stages/sampling_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/grafana/alloy/internal/featuregate" util_log "github.com/grafana/loki/v3/pkg/util/log" ) @@ -20,7 +21,7 @@ stage.sampling { func TestSamplingPipeline(t *testing.T) { registry := prometheus.NewRegistry() - pl, err := NewPipeline(util_log.Logger, loadConfig(testSamplingAlloy), &plName, registry) + pl, err := NewPipeline(util_log.Logger, loadConfig(testSamplingAlloy), &plName, registry, featuregate.StabilityGenerallyAvailable) require.NoError(t, err) entries := make([]Entry, 0) diff --git a/internal/component/loki/process/stages/stage.go b/internal/component/loki/process/stages/stage.go index 3787daabac..e6040b5403 100644 --- a/internal/component/loki/process/stages/stage.go +++ b/internal/component/loki/process/stages/stage.go @@ -8,6 +8,7 @@ import ( "github.com/go-kit/log" "github.com/grafana/alloy/internal/component/common/loki" + "github.com/grafana/alloy/internal/featuregate" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "gopkg.in/yaml.v2" @@ -43,8 +44,14 @@ const ( StageTypeTemplate = "template" StageTypeTenant = "tenant" StageTypeTimestamp = "timestamp" + StageTypeWindowsEvent = "windowsevent" ) +// Add stages that are not GA. Stages that are not specified here are considered GA. +var stagesUnstable = map[string]featuregate.Stability{ + StageTypeWindowsEvent: featuregate.StabilityExperimental, +} + // Processor takes an existing set of labels, timestamp and log entry and returns either a possibly mutated // timestamp and log entry type Processor interface { @@ -111,20 +118,28 @@ func toStage(p Processor) Stage { } } +func checkFeatureStability(stageName string, minStability featuregate.Stability) error { + blockStability, exist := stagesUnstable[stageName] + if exist { + return featuregate.CheckAllowed(blockStability, minStability, fmt.Sprintf("stage %q", stageName)) + } + return nil +} + // New creates a new stage for the given type and configuration. -func New(logger log.Logger, jobName *string, cfg StageConfig, registerer prometheus.Registerer) (Stage, error) { +func New(logger log.Logger, jobName *string, cfg StageConfig, registerer prometheus.Registerer, minStability featuregate.Stability) (Stage, error) { var ( s Stage err error ) switch { case cfg.DockerConfig != nil: - s, err = NewDocker(logger, registerer) + s, err = NewDocker(logger, registerer, minStability) if err != nil { return nil, err } case cfg.CRIConfig != nil: - s, err = NewCRI(logger, *cfg.CRIConfig, registerer) + s, err = NewCRI(logger, *cfg.CRIConfig, registerer, minStability) if err != nil { return nil, err } @@ -174,7 +189,7 @@ func New(logger log.Logger, jobName *string, cfg StageConfig, registerer prometh return nil, err } case cfg.MatchConfig != nil: - s, err = newMatcherStage(logger, jobName, *cfg.MatchConfig, registerer) + s, err = newMatcherStage(logger, jobName, *cfg.MatchConfig, registerer, minStability) if err != nil { return nil, err } @@ -239,9 +254,16 @@ func New(logger log.Logger, jobName *string, cfg StageConfig, registerer prometh s = newSamplingStage(logger, *cfg.SamplingConfig, registerer) case cfg.EventLogMessageConfig != nil: s = newEventLogMessageStage(logger, cfg.EventLogMessageConfig) + case cfg.WindowsEventConfig != nil: + s = newWindowsEventStage(logger, cfg.WindowsEventConfig) default: panic(fmt.Sprintf("unreachable; should have decoded into one of the StageConfig fields: %+v", cfg)) } + + if err := checkFeatureStability(s.Name(), minStability); err != nil { + return nil, err + } + return s, nil } diff --git a/internal/component/loki/process/stages/structured_metadata_test.go b/internal/component/loki/process/stages/structured_metadata_test.go index 988bcdfe64..25955a8f1c 100644 --- a/internal/component/loki/process/stages/structured_metadata_test.go +++ b/internal/component/loki/process/stages/structured_metadata_test.go @@ -8,6 +8,7 @@ import ( "github.com/prometheus/common/model" "github.com/stretchr/testify/require" + "github.com/grafana/alloy/internal/featuregate" "github.com/grafana/loki/pkg/push" util_log "github.com/grafana/loki/v3/pkg/util/log" ) @@ -139,7 +140,7 @@ func Test_StructuredMetadataStage(t *testing.T) { } for name, test := range tests { t.Run(name, func(t *testing.T) { - pl, err := NewPipeline(util_log.Logger, loadConfig(test.pipelineStagesYaml), nil, prometheus.DefaultRegisterer) + pl, err := NewPipeline(util_log.Logger, loadConfig(test.pipelineStagesYaml), nil, prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable) require.NoError(t, err) result := processEntries(pl, newEntry(nil, nil, test.logLine, time.Now()))[0] diff --git a/internal/component/loki/process/stages/template_test.go b/internal/component/loki/process/stages/template_test.go index 21650a7de4..992e87847a 100644 --- a/internal/component/loki/process/stages/template_test.go +++ b/internal/component/loki/process/stages/template_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/grafana/alloy/internal/featuregate" util_log "github.com/grafana/loki/v3/pkg/util/log" ) @@ -58,7 +59,7 @@ var testTemplateLogLineWithMissingKey = ` ` func TestPipeline_Template(t *testing.T) { - pl, err := NewPipeline(util_log.Logger, loadConfig(testTemplateYaml), nil, prometheus.DefaultRegisterer) + pl, err := NewPipeline(util_log.Logger, loadConfig(testTemplateYaml), nil, prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable) if err != nil { t.Fatal(err) } @@ -75,7 +76,7 @@ func TestPipelineWithMissingKey_Template(t *testing.T) { var buf bytes.Buffer w := log.NewSyncWriter(&buf) logger := log.NewLogfmtLogger(w) - pl, err := NewPipeline(logger, loadConfig(testTemplateYaml), nil, prometheus.DefaultRegisterer) + pl, err := NewPipeline(logger, loadConfig(testTemplateYaml), nil, prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable) if err != nil { t.Fatal(err) } diff --git a/internal/component/loki/process/stages/tenant_test.go b/internal/component/loki/process/stages/tenant_test.go index dd9b8bd87f..d9a459fab7 100644 --- a/internal/component/loki/process/stages/tenant_test.go +++ b/internal/component/loki/process/stages/tenant_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/grafana/alloy/internal/featuregate" "github.com/grafana/loki/v3/clients/pkg/promtail/client" lokiutil "github.com/grafana/loki/v3/pkg/util" @@ -39,7 +40,7 @@ func TestPipelineWithMissingKey_Tenant(t *testing.T) { var buf bytes.Buffer w := log.NewSyncWriter(&buf) logger := log.NewLogfmtLogger(w) - pl, err := NewPipeline(logger, loadConfig(testTenantAlloyExtractedData), nil, prometheus.DefaultRegisterer) + pl, err := NewPipeline(logger, loadConfig(testTenantAlloyExtractedData), nil, prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable) if err != nil { t.Fatal(err) } diff --git a/internal/component/loki/process/stages/timestamp_test.go b/internal/component/loki/process/stages/timestamp_test.go index f1dff22267..759d65e1b0 100644 --- a/internal/component/loki/process/stages/timestamp_test.go +++ b/internal/component/loki/process/stages/timestamp_test.go @@ -13,6 +13,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/grafana/alloy/internal/featuregate" "github.com/grafana/alloy/internal/util" ) @@ -44,7 +45,7 @@ var testTimestampLogLineWithMissingKey = ` func TestTimestampPipeline(t *testing.T) { logger := util.TestAlloyLogger(t) - pl, err := NewPipeline(logger, loadConfig(testTimestampAlloy), nil, prometheus.DefaultRegisterer) + pl, err := NewPipeline(logger, loadConfig(testTimestampAlloy), nil, prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable) require.NoError(t, err) out := processEntries(pl, newEntry(nil, nil, testTimestampLogLine, time.Now()))[0] @@ -61,7 +62,7 @@ func TestPipelineWithMissingKey_Timestamp(t *testing.T) { var buf bytes.Buffer w := log.NewSyncWriter(&buf) logger := log.NewLogfmtLogger(w) - pl, err := NewPipeline(logger, loadConfig(testTimestampAlloy), nil, prometheus.DefaultRegisterer) + pl, err := NewPipeline(logger, loadConfig(testTimestampAlloy), nil, prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable) require.NoError(t, err) _ = processEntries(pl, newEntry(nil, nil, testTimestampLogLineWithMissingKey, time.Now())) diff --git a/internal/component/loki/process/stages/windowsevent.go b/internal/component/loki/process/stages/windowsevent.go new file mode 100644 index 0000000000..0aca2ed1be --- /dev/null +++ b/internal/component/loki/process/stages/windowsevent.go @@ -0,0 +1,204 @@ +package stages + +import ( + "fmt" + "strings" + + "github.com/go-kit/log" + "github.com/grafana/alloy/internal/runtime/logging/level" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/util/strutil" +) + +const ( + defaultWindowsEventSource = "message" + descriptionLabel = "Description" +) + +type WindowsEventConfig struct { + Source string `alloy:"source,attr,optional"` + DropInvalidLabels bool `alloy:"drop_invalid_labels,attr,optional"` + OverwriteExisting bool `alloy:"overwrite_existing,attr,optional"` +} + +func (e *WindowsEventConfig) Validate() error { + if !model.LabelName(e.Source).IsValid() { + return fmt.Errorf(ErrInvalidLabelName, e.Source) + } + return nil +} + +func (e *WindowsEventConfig) SetToDefault() { + e.Source = defaultWindowsEventSource +} + +type WindowsEventStage struct { + cfg *WindowsEventConfig + logger log.Logger + + keyReplacer *strings.Replacer + valueReplacer *strings.Replacer +} + +// Create a windowsevent stage, including validating any supplied configuration +func newWindowsEventStage(logger log.Logger, cfg *WindowsEventConfig) Stage { + return &WindowsEventStage{ + cfg: cfg, + logger: log.With(logger, "component", "stage", "type", "windowsevent"), + keyReplacer: strings.NewReplacer("\t", "", "\r", "", "\n", "", " ", ""), + valueReplacer: strings.NewReplacer("\t", "", "\r", "", "\n", ""), + } +} + +func (w *WindowsEventStage) Run(in chan Entry) chan Entry { + out := make(chan Entry) + key := w.cfg.Source + go func() { + defer close(out) + for e := range in { + err := w.processEntry(e.Extracted, key) + if err != nil { + continue + } + out <- e + } + }() + return out +} + +// Process a windows event message from extracted with the specified key, adding additional +// entries into the extracted map. +func (w *WindowsEventStage) processEntry(extracted map[string]interface{}, key string) error { + value, ok := extracted[key] + if !ok { + if Debug { + level.Debug(w.logger).Log("msg", "source not in the extracted values", "source", key) + } + return nil + } + s, err := getString(value) + if err != nil { + level.Warn(w.logger).Log("msg", "invalid label value parsed", "value", value) + return err + } + + // Messages are expected to have sections that are split by empty lines. + sections := strings.Split(s, "\r\n\r\n") + for i, section := range sections { + + // The first section is extracted as the description of the message. + if i == 0 { + ek, err := w.sanitizeKey(descriptionLabel, extracted) + if err != nil { + w.logParseErr(err) + continue + } + ev, err := w.sanitizeValue(section) + if err != nil { + w.logParseErr(err) + continue + } + extracted[ek] = ev + continue + } + + j := 0 + lines := strings.Split(section, "\r\n") + keyPrefix := "" + for j < len(lines) { + parts := strings.SplitN(lines[j], ":", 2) + + // Skip lines that don't follow the key:value pattern. + if len(parts) < 2 { + j++ + continue + } + + ek := parts[0] + ev := parts[1] + j++ + + if ev == "" { + // Some messages have a section title such has: + // Logon Information: + // Logon Type:5 + // Virtual Account:No + // To avoid collisions with other sections, we use the section title as prefix + if j == 1 { + // The prefix is not sanitized here because the sanitization process should be + // applied on the full key only. Else it can add an unnecessary "_extracted" suffix to the prefix. + keyPrefix = ek + } + continue + } + + // Handle multi-line values. + // Following lines that are not empty and don't contain a ":" are considered part of the previous value. + for j < len(lines) && lines[j] != "" && !strings.Contains(lines[j], ":") { + ev += "," + lines[j] + j++ + } + + if keyPrefix != "" { + ek = keyPrefix + "_" + ek + } + + sanitizedKey, err := w.sanitizeKey(ek, extracted) + if err != nil { + w.logParseErr(err) + continue + } + + sanitizedValue, err := w.sanitizeValue(ev) + if err != nil { + w.logParseErr(err) + continue + } + extracted[sanitizedKey] = sanitizedValue + } + } + if Debug { + level.Debug(w.logger).Log("msg", "extracted data debug in windowsevent stage", + "extracted data", fmt.Sprintf("%v", extracted)) + } + return nil +} + +func (w *WindowsEventStage) sanitizeKey(ekey string, extracted map[string]interface{}) (string, error) { + k := w.keyReplacer.Replace(ekey) + if !model.LabelName(k).IsValid() { + if w.cfg.DropInvalidLabels { + return "", fmt.Errorf("invalid label parsed from message, key: %s", k) + } + k = strutil.SanitizeFullLabelName(k) + } + if _, ok := extracted[k]; ok && !w.cfg.OverwriteExisting { + level.Info(w.logger).Log("msg", "extracted key that already existed, appending _extracted to key", + "key", k) + k += "_extracted" + } + return k, nil +} + +func (w *WindowsEventStage) sanitizeValue(evalue string) (string, error) { + v := strings.TrimSpace(w.valueReplacer.Replace(evalue)) + if !model.LabelValue(v).IsValid() { + return "", fmt.Errorf("invalid value parsed from message, value: %s", v) + } + return v, nil +} + +func (w *WindowsEventStage) logParseErr(err error) { + if Debug { + level.Debug(w.logger).Log("msg", err.Error()) + } +} + +func (w *WindowsEventStage) Name() string { + return StageTypeWindowsEvent +} + +// Cleanup implements Stage. +func (*WindowsEventStage) Cleanup() { + // no-op +} diff --git a/internal/component/loki/process/stages/windowsevent_test.go b/internal/component/loki/process/stages/windowsevent_test.go new file mode 100644 index 0000000000..c2c43a2e8d --- /dev/null +++ b/internal/component/loki/process/stages/windowsevent_test.go @@ -0,0 +1,244 @@ +package stages + +import ( + "errors" + "fmt" + "testing" + "time" + + util_log "github.com/grafana/loki/v3/pkg/util/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + + "github.com/grafana/alloy/internal/featuregate" + "github.com/grafana/alloy/syntax" +) + +var testWindowsEventMsgDefaults = ` +stage.windowsevent {} +` + +var testWindowsEventMsgCustomSource = ` +stage.windowsevent { source = "CustomSource" } +` + +var testWindowsEventMsgDropInvalidLabels = ` +stage.windowsevent { drop_invalid_labels = true } +` + +var testWindowsEventMsgOverwriteExisting = ` +stage.windowsevent { overwrite_existing = true } +` + +func TestWindowsEvent(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + msgdata string + extractedValues map[string]interface{} + }{ + "System": { + msgdata: "Windows Update started downloading an update.", + extractedValues: map[string]interface{}{ + "Description": "Windows Update started downloading an update.", + }, + }, + "Setup": { + msgdata: "Initiating changes for package KB5044285. Current state is Superseded. Target state is Absent. Client id: Arbiter.", + extractedValues: map[string]interface{}{ + "Description": "Initiating changes for package KB5044285. Current state is Superseded. Target state is Absent. Client id: Arbiter.", + }, + }, + "Security1": { + msgdata: "Credential Manager credentials were read.\r\n\r\nSubject:\r\n\tSecurity ID:\t\tS-1-5-21-1111111-1111111-1111111-1111\r\n\tAccount Name:\t\tBob\r\n\tAccount Domain:\t\tDESKTOP-AAAAAA\r\n\tLogon ID:\t\t0x11111111\r\n\tRead Operation:\t\tEnumerate Credentials\r\n\r\nThis event occurs when a user performs a read operation on stored credentials in Credential Manager.", + extractedValues: map[string]interface{}{ + "Description": "Credential Manager credentials were read.", + "Subject_SecurityID": "S-1-5-21-1111111-1111111-1111111-1111", + "Subject_AccountName": "Bob", + "Subject_AccountDomain": "DESKTOP-AAAAAA", + "Subject_LogonID": "0x11111111", + "Subject_ReadOperation": "Enumerate Credentials", + }, + }, + "Security2": { + msgdata: "An account was successfully logged on.\r\n\r\nSubject:\r\n\tSecurity ID:\t\tS-1-1-1\r\n\tAccount Name:\t\tDESKTOP-AAAAA$\r\n\tAccount Domain:\t\tWORKGROUP\r\n\tLogon ID:\t\t0xAAA\r\n\r\nLogon Information:\r\n\tLogon Type:\t\t5\r\n\tRestricted Admin Mode:\t-\r\n\tRemote Credential Guard:\t-\r\n\tVirtual Account:\t\tNo\r\n\tElevated Token:\t\tYes\r\n\r\nImpersonation Level:\t\tImpersonation\r\n\r\nNew Logon:\r\n\tSecurity ID:\t\tS-1-1-1\r\n\tAccount Name:\t\tSYSTEM\r\n\tAccount Domain:\t\tNT AUTHORITY\r\n\tLogon ID:\t\t0xAAA\r\n\tLinked Logon ID:\t\t0x0\r\n\tNetwork Account Name:\t-\r\n\tNetwork Account Domain:\t-\r\n\tLogon GUID:\t\t{00000000-0000-0000-0000-000000000000}\r\n\r\nProcess Information:\r\n\tProcess ID:\t\t0x4c0\r\n\tProcess Name:\t\tC:\\Windows\\System32\\services.exe\r\n\r\nNetwork Information:\r\n\tWorkstation Name:\t-\r\n\tSource Network Address:\t-\r\n\tSource Port:\t\t-\r\n\r\nDetailed Authentication Information:\r\n\tLogon Process:\t\tAdvapi \r\n\tAuthentication Package:\tNegotiate\r\n\tTransited Services:\t-\r\n\tPackage Name (NTLM only):\t-\r\n\tKey Length:\t\t0\r\n\r\nThis event is generated when a logon session is created. It is generated on the computer that was accessed.\r\n\r\nThe subject fields indicate the account on the local system which requested the logon. This is most commonly a service such as the Server service, or a local process such as Winlogon.exe or Services.exe.\r\n\r\nThe logon type field indicates the kind of logon that occurred. The most common types are 2 (interactive) and 3 (network).\r\n\r\nThe New Logon fields indicate the account for whom the new logon was created, i.e. the account that was logged on.\r\n\r\nThe network fields indicate where a remote logon request originated. Workstation name is not always available and may be left blank in some cases.\r\n\r\nThe impersonation level field indicates the extent to which a process in the logon session can impersonate.\r\n\r\nThe authentication information fields provide detailed information about this specific logon request.\r\n\t- Logon GUID is a unique identifier that can be used to correlate this event with a KDC event.\r\n\t- Transited services indicate which intermediate services have participated in this logon request.\r\n\t- Package name indicates which sub-protocol was used among the NTLM protocols.\r\n\t- Key length indicates the length of the generated session key. This will be 0 if no session key was requested.", + extractedValues: map[string]interface{}{ + "Description": "An account was successfully logged on.", + "Subject_SecurityID": "S-1-1-1", + "Subject_AccountName": "DESKTOP-AAAAA$", + "Subject_AccountDomain": "WORKGROUP", + "Subject_LogonID": "0xAAA", + "LogonInformation_LogonType": "5", + "LogonInformation_RestrictedAdminMode": "-", + "LogonInformation_RemoteCredentialGuard": "-", + "LogonInformation_VirtualAccount": "No", + "LogonInformation_ElevatedToken": "Yes", + "ImpersonationLevel": "Impersonation", + "NewLogon_SecurityID": "S-1-1-1", + "NewLogon_AccountName": "SYSTEM", + "NewLogon_AccountDomain": "NT AUTHORITY", + "NewLogon_LogonID": "0xAAA", + "NewLogon_LinkedLogonID": "0x0", + "NewLogon_NetworkAccountName": "-", + "NewLogon_NetworkAccountDomain": "-", + "NewLogon_LogonGUID": "{00000000-0000-0000-0000-000000000000}", + "ProcessInformation_ProcessID": "0x4c0", + "ProcessInformation_ProcessName": "C:\\Windows\\System32\\services.exe", + "NetworkInformation_WorkstationName": "-", + "NetworkInformation_SourceNetworkAddress": "-", + "NetworkInformation_SourcePort": "-", + "DetailedAuthenticationInformation_LogonProcess": "Advapi", + "DetailedAuthenticationInformation_AuthenticationPackage": "Negotiate", + "DetailedAuthenticationInformation_TransitedServices": "-", + "DetailedAuthenticationInformation_PackageName_NTLMonly_": "-", + "DetailedAuthenticationInformation_KeyLength": "0", + }, + }, + "Security3": { + msgdata: "Special privileges assigned to new logon.\r\n\r\nSubject:\r\n\tSecurity ID:\t\tS-1-1-1\r\n\tAccount Name:\t\tSYSTEM\r\n\tAccount Domain:\t\tNT AUTHORITY\r\n\tLogon ID:\t\t0xAAA\r\n\r\nPrivileges:\t\tSeAssignPrimaryTokenPrivilege\r\n\t\t\tSeTcbPrivilege\r\n\t\t\tSeSecurityPrivilege\r\n\t\t\tSeTakeOwnershipPrivilege\r\n\t\t\tSeLoadDriverPrivilege\r\n\t\t\tSeBackupPrivilege\r\n\t\t\tSeRestorePrivilege\r\n\t\t\tSeDebugPrivilege\r\n\t\t\tSeAuditPrivilege\r\n\t\t\tSeSystemEnvironmentPrivilege\r\n\t\t\tSeImpersonatePrivilege\r\n\t\t\tSeDelegateSessionUserImpersonatePrivilege", + extractedValues: map[string]interface{}{ + "Description": "Special privileges assigned to new logon.", + "Subject_SecurityID": "S-1-1-1", + "Subject_AccountName": "SYSTEM", + "Subject_AccountDomain": "NT AUTHORITY", + "Subject_LogonID": "0xAAA", + "Privileges": "SeAssignPrimaryTokenPrivilege,SeTcbPrivilege,SeSecurityPrivilege,SeTakeOwnershipPrivilege,SeLoadDriverPrivilege,SeBackupPrivilege,SeRestorePrivilege,SeDebugPrivilege,SeAuditPrivilege,SeSystemEnvironmentPrivilege,SeImpersonatePrivilege,SeDelegateSessionUserImpersonatePrivilege", + }, + }, + } + + for testName, testData := range tests { + testData := testData + testData.extractedValues["message"] = testData.msgdata + + t.Run(testName, func(t *testing.T) { + t.Parallel() + + pl, err := NewPipeline(util_log.Logger, loadConfig(testWindowsEventMsgDefaults), nil, prometheus.DefaultRegisterer, featuregate.StabilityExperimental) + require.NoError(t, err, "Expected pipeline creation to not result in error") + out := processEntries(pl, + newEntry(map[string]interface{}{ + "message": testData.msgdata, + }, nil, testData.msgdata, time.Now()))[0] + require.Equal(t, testData.extractedValues, out.Extracted) + }) + } +} + +func TestWindowsEventArgs(t *testing.T) { + tests := map[string]struct { + config string + sourcekey string + msgdata string + extractedValues map[string]interface{} + }{ + "CustomSource": { + config: testWindowsEventMsgCustomSource, + sourcekey: "CustomSource", + msgdata: "This is a test message.\r\n\r\nKey1: Value 1\r\nKey2: Value 2", + extractedValues: map[string]interface{}{ + "Description": "This is a test message.", + "Key1": "Value 1", + "Key2": "Value 2", + "testOverride": "initial", + }, + }, + "DropInvalid": { + config: testWindowsEventMsgDropInvalidLabels, + sourcekey: "message", + msgdata: "This is a test message.\r\n\r\nInvalid(label): Value 1\r\nKey2: Value 2", + extractedValues: map[string]interface{}{ + "Description": "This is a test message.", + "Key2": "Value 2", + "testOverride": "initial", + }, + }, + "OverrideExisting": { + config: testWindowsEventMsgOverwriteExisting, + sourcekey: "message", + msgdata: "This is a test message.\r\n\r\ntestOverride: newValue\r\nKey2: Value 2", + extractedValues: map[string]interface{}{ + "Description": "This is a test message.", + "Key2": "Value 2", + "testOverride": "newValue", + }, + }, + "DontOverride": { + config: testWindowsEventMsgDefaults, + sourcekey: "message", + msgdata: "This is a test message.\r\n\r\ntestOverride: newValue\r\nKey2: Value 2", + extractedValues: map[string]interface{}{ + "Description": "This is a test message.", + "Key2": "Value 2", + "testOverride": "initial", + "testOverride_extracted": "newValue", + }, + }, + } + for testName, testData := range tests { + testData := testData + testData.extractedValues[testData.sourcekey] = testData.msgdata + + t.Run(testName, func(t *testing.T) { + t.Parallel() + + pl, err := NewPipeline(util_log.Logger, loadConfig(testData.config), nil, prometheus.DefaultRegisterer, featuregate.StabilityExperimental) + require.NoError(t, err, "Expected pipeline creation to not result in error") + out := processEntries(pl, + newEntry(map[string]interface{}{ + testData.sourcekey: testData.msgdata, + "testOverride": "initial", + }, nil, testData.msgdata, time.Now()))[0] + require.Equal(t, testData.extractedValues, out.Extracted) + }) + } +} + +func TestWindowsEventValidate(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + config string + err error + }{ + "valid config": { + `stage.windowsevent { source = "msg"}`, + nil, + }, + "invalid config": { + `stage.windowsevent { source = 1}`, + errors.New("invalid label name: 1"), + }, + "invalid source": { + `stage.windowsevent { source = "the message"}`, + fmt.Errorf(ErrInvalidLabelName, "the message"), + }, + "empty source": { + `stage.windowsevent { source = ""}`, + fmt.Errorf(ErrInvalidLabelName, ""), + }, + } + for tName, tt := range tests { + tt := tt + t.Run(tName, func(t *testing.T) { + var config Configs + err := syntax.Unmarshal([]byte(tt.config), &config) + if err == nil { + require.Len(t, config.Stages, 1) + err = config.Stages[0].WindowsEventConfig.Validate() + } + + if err == nil && tt.err != nil { + require.NotNil(t, err, "windowsevent.validate() expected error = %v, but got nil", tt.err) + } + if err != nil { + require.Equal(t, tt.err.Error(), err.Error(), "windowsevent.validate() expected error = %v, actual error = %v", tt.err, err) + } + }) + } +} + +func TestWindowsEventStabilityLevel(t *testing.T) { + _, err := NewPipeline(util_log.Logger, loadConfig(testWindowsEventMsgDefaults), nil, prometheus.DefaultRegisterer, featuregate.StabilityPublicPreview) + require.ErrorContains(t, err, `invalid stage config stage "windowsevent" is at stability level "experimental", which is below the minimum allowed stability level "public-preview". Use --stability.level command-line flag to enable "experimental" features`) +}