Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add windowsevent stage loki process #2545

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ Main (unreleased)

### Features

- Add the possibility to export span events as logs in `otelcol.connector.spanlogs`. (@steve-hb)
- (_Experimental_) Add a `stage.windowsevent` block in the `loki.process` component. This aims to replace the existing `stage.eventlogmessage`. (@wildum)

### Enhancements

- Add the possibility to export span events as logs in `otelcol.connector.spanlogs`. (@steve-hb)

- (_Experimental_) Log instance label key in `database_observability.mysql` (@cristiangreco)

- (_Experimental_) Improve parsing of truncated queries in `database_observability.mysql` (@cristiangreco)
Expand Down
90 changes: 90 additions & 0 deletions docs/sources/reference/components/loki/loki.process.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ The following blocks are supported inside the definition of `loki.process`:
| stage.template | [stage.template][] | Configures a `template` processing stage. | no |
| stage.tenant | [stage.tenant][] | Configures a `tenant` processing stage. | no |
| stage.timestamp | [stage.timestamp][] | Configures a `timestamp` processing stage. | no |
| stage.windowsevent | [stage.windowsevent][] | Configures a `windowsevent` processing stage. | no |

A user can provide any number of these stage blocks nested inside `loki.process`; these will run in order of appearance in the configuration file.

Expand Down Expand Up @@ -98,6 +99,7 @@ A user can provide any number of these stage blocks nested inside `loki.process`
[stage.template]: #stagetemplate-block
[stage.tenant]: #stagetenant-block
[stage.timestamp]: #stagetimestamp-block
[stage.windowsevent]: #stagewindowsevent-block


### stage.cri block
Expand Down Expand Up @@ -242,6 +244,8 @@ stage.drop {

### stage.eventlogmessage block

Deprecated in favor of the [stage.windowsevent block][stage.windowsevent].

The `eventlogmessage` stage extracts data from the Message string that appears in the Windows Event Log.

The following arguments are supported:
Expand Down Expand Up @@ -1694,6 +1698,92 @@ loki.process "example" {
The `json` stage extracts the IP address from the `client_ip` key in the log line.
Then the extracted `ip` value is given as source to geoip stage. The geoip stage performs a lookup on the IP and populates the shared map with the data from the city database results in addition to the custom lookups. Lastly, the custom lookup fields from the shared map are added as labels.

### stage.windowsevent block

The `windowsevent` stage extracts data from the message string that appears in the Windows Event Log.
wildum marked this conversation as resolved.
Show resolved Hide resolved

The following arguments are supported:

| Name | Type | Description | Default | Required |
|-----------------------|----------|--------------------------------------------------------|-----------|----------|
| `source` | `string` | Name of the field in the extracted data to parse. | `message` | no |
| `overwrite_existing` | `bool` | Whether to overwrite existing extracted data fields. | `false` | no |
| `drop_invalid_labels` | `bool` | Whether to drop fields that are not valid label names. | `false` | no |

When `overwrite_existing` is set to `true`, the stage overwrites existing extracted data fields with the same name.
If set to `false`, the `_extracted` suffix will be appended to an already existing field name.
wildum marked this conversation as resolved.
Show resolved Hide resolved

When `drop_invalid_labels` is set to `true`, the stage drops fields that are not valid label names.
wildum marked this conversation as resolved.
Show resolved Hide resolved
If set to `false`, the stage will automatically convert them into valid labels replacing invalid characters with underscores.
wildum marked this conversation as resolved.
Show resolved Hide resolved

The `windowsevent` stage expects the message to structured in sections that are split by empty lines.
wildum marked this conversation as resolved.
Show resolved Hide resolved

The first section of the input is treated as a whole block and stored in the extracted map with the key `Description`.

Sections following the Description are expected to contain key-value pairs in the format key:value.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if they don't have key:value pairs? The "are expected" suggests that it may not happen this way.

Is this a "must"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should rephrase because it's not a must but the lines that don't have a key:value pair will be ignored (unless there was already a key:value pair in the section. In this case it's considered a multi line value)

Copy link
Contributor

@clayton-cornell clayton-cornell Jan 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just adding the bit about what happens if a key:value pair isn't found will help clarify what happens.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's documented a bit below with "Lines in a section without a preceding valid entry (key-value pair) are ignored and discarded."


If the first line of a section has no value (e.g., "Subject:"), the key will act as a prefix for subsequent keys in the same section.
wildum marked this conversation as resolved.
Show resolved Hide resolved

If a line within a section does not include the `:` symbol, it is considered part of the previous entry's value. The line is appended to the previous value, separated by a comma.

Lines in a section without a preceding valid entry (key-value pair) are ignored and discarded.

#### Example with `loki.source.windowsevent`

```alloy
loki.source.windowsevent "security" {
eventlog_name = "Security"
forward_to = [loki.process.default.receiver]
}

loki.process "default" {
forward_to = [loki.write.default.receiver]

stage.json {
expressions = {
message = "",
Overwritten = "",
}
}

stage.windowsevent {
source = "message"
overwrite_existing = true
}

stage.labels {
values = {
Description = "",
Subject_SecurityID = "",
ReadOP = "Subject_ReadOperation",
}
}
}
```

The `loki.source.windowsevent` component forwards Windows security events to the `loki.process` component.

Given the following event:
```
{"event_id": 1, "Overwritten": "old", "message": ""Special privileges assigned to new logon.\r\n\r\nSubject:\r\n\tSecurity ID:\t\tS-1-1-1\r\n\tAccount Name:\t\tSYSTEM\r\n\tAccount Domain:\t\tNT AUTHORITY\r\n\tLogon ID:\t\t0xAAA\r\n\r\nPrivileges:\t\tSeAssignPrimaryTokenPrivilege\r\n\t\t\tSeTcbPrivilege\r\n\t\t\tSeSecurityPrivilege\r\n\t\t\tSeTakeOwnershipPrivilege\r\n\t\t\tSeLoadDriverPrivilege\r\n\t\t\tSeBackupPrivilege\r\n\t\t\tSeRestorePrivilege\r\n\t\t\tSeDebugPrivilege\r\n\t\t\tSeAuditPrivilege\r\n\t\t\tSeSystemEnvironmentPrivilege\r\n\t\t\tSeImpersonatePrivilege\r\n\t\t\tSeDelegateSessionUserImpersonatePrivilege""}
```

The `json` stage would create the following key-value pairs in the set of extracted data:

- `message`: `"Special privileges assigned to new logon.\r\n\r\nSubject:\r\n\tSecurity ID:\t\tS-1-1-1\r\n\tAccount Name:\t\tSYSTEM\r\n\tAccount Domain:\t\tNT AUTHORITY\r\n\tLogon ID:\t\t0xAAA\r\n\r\nPrivileges:\t\tSeAssignPrimaryTokenPrivilege\r\n\t\t\tSeTcbPrivilege\r\n\t\t\tSeSecurityPrivilege"`
- `Overwritten`: `old`

The `windowsevent` stage will parse the value of `message` from the extracted data and append/overwrite the following key-value pairs to the set of extracted data:

- `Description`: "Special privileges assigned to new logon.",
- `Subject_SecurityID`: "S-1-1-1",
- `Subject_AccountName`: "SYSTEM",
- `Subject_AccountDomain`: "NT AUTHORITY",
- `Subject_LogonID`: "0xAAA",
- `Privileges`: "SeAssignPrimaryTokenPrivilege,SeTcbPrivilege,SeSecurityPrivilege",

Finally the `labels` stage will use the extracted values `Description`, `Subject_SecurityID` and `Subject_ReadOperation` to add them as labels of the log entry before forwarding it to a `loki.write` component.

## Exported fields

The following fields are exported and can be referenced by other components:
Expand Down
2 changes: 1 addition & 1 deletion internal/component/loki/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (c *Component) Update(args component.Arguments) error {
c.entryHandler.Stop()
}

pipeline, err := stages.NewPipeline(c.opts.Logger, newArgs.Stages, &c.opts.ID, c.opts.Registerer)
pipeline, err := stages.NewPipeline(c.opts.Logger, newArgs.Stages, &c.opts.ID, c.opts.Registerer, c.opts.MinStability)
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion internal/component/loki/process/stages/decolorize_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"

"github.com/grafana/alloy/internal/featuregate"
util_log "github.com/grafana/loki/v3/pkg/util/log"
)

Expand Down Expand Up @@ -40,7 +41,7 @@ func TestPipeline_Decolorize(t *testing.T) {
t.Run(testName, func(t *testing.T) {
t.Parallel()

pl, err := NewPipeline(util_log.Logger, loadConfig(testData.config), nil, prometheus.DefaultRegisterer)
pl, err := NewPipeline(util_log.Logger, loadConfig(testData.config), nil, prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable)
if err != nil {
t.Fatal(err)
}
Expand Down
3 changes: 2 additions & 1 deletion internal/component/loki/process/stages/drop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/alloy/internal/featuregate"
"github.com/grafana/alloy/internal/util"
)

Expand Down Expand Up @@ -433,7 +434,7 @@ func TestDropPipeline(t *testing.T) {
registry := prometheus.NewRegistry()
plName := "test_drop_pipeline"
logger := util.TestAlloyLogger(t)
pl, err := NewPipeline(logger, loadConfig(testDropAlloy), &plName, registry)
pl, err := NewPipeline(logger, loadConfig(testDropAlloy), &plName, registry, featuregate.StabilityGenerallyAvailable)
require.NoError(t, err)
out := processEntries(pl,
newEntry(nil, nil, testMatchLogLineApp1, time.Now()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/alloy/internal/featuregate"
"github.com/grafana/alloy/syntax"
)

Expand Down Expand Up @@ -107,7 +108,7 @@ func TestEventLogMessage_simple(t *testing.T) {
t.Run(testName, func(t *testing.T) {
t.Parallel()

pl, err := NewPipeline(util_log.Logger, loadConfig(testData.config), nil, prometheus.DefaultRegisterer)
pl, err := NewPipeline(util_log.Logger, loadConfig(testData.config), nil, prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable)
assert.NoError(t, err, "Expected pipeline creation to not result in error")
out := processEntries(pl,
newEntry(map[string]interface{}{
Expand Down Expand Up @@ -267,7 +268,7 @@ func TestEventLogMessage_Real(t *testing.T) {
t.Run(testName, func(t *testing.T) {
t.Parallel()

pl, err := NewPipeline(util_log.Logger, loadConfig(testData.config), nil, prometheus.DefaultRegisterer)
pl, err := NewPipeline(util_log.Logger, loadConfig(testData.config), nil, prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable)
assert.NoError(t, err, "Expected pipeline creation to not result in error")
out := processEntries(pl,
newEntry(map[string]interface{}{testData.sourcekey: testData.msgdata}, nil, testData.msgdata, time.Now()))[0]
Expand Down Expand Up @@ -323,7 +324,7 @@ func TestEventLogMessage_invalid(t *testing.T) {
t.Run(testName, func(t *testing.T) {
t.Parallel()

pl, err := NewPipeline(util_log.Logger, loadConfig(testData.config), nil, prometheus.DefaultRegisterer)
pl, err := NewPipeline(util_log.Logger, loadConfig(testData.config), nil, prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable)
assert.NoError(t, err, "Expected pipeline creation to not result in error")
out := processEntries(pl,
newEntry(map[string]interface{}{testData.sourcekey: testData.msgdata}, nil, testData.msgdata, time.Now()))[0]
Expand All @@ -335,7 +336,7 @@ func TestEventLogMessage_invalid(t *testing.T) {
func TestEventLogMessage_invalidString(t *testing.T) {
t.Parallel()

pl, err := NewPipeline(util_log.Logger, loadConfig(testEvtLogMsgYamlDefaults), nil, prometheus.DefaultRegisterer)
pl, err := NewPipeline(util_log.Logger, loadConfig(testEvtLogMsgYamlDefaults), nil, prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable)
assert.NoError(t, err, "Expected pipeline creation to not result in error")
out := processEntries(pl,
newEntry(map[string]interface{}{"message": nil}, nil, "", time.Now()))
Expand Down
9 changes: 5 additions & 4 deletions internal/component/loki/process/stages/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"strings"

"github.com/go-kit/log"
"github.com/grafana/alloy/internal/featuregate"
"github.com/grafana/alloy/internal/runtime/logging/level"
"github.com/grafana/alloy/syntax"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -55,7 +56,7 @@ func (args *CRIConfig) Validate() error {

// NewDocker creates a predefined pipeline for parsing entries in the Docker
// json log format.
func NewDocker(logger log.Logger, registerer prometheus.Registerer) (Stage, error) {
func NewDocker(logger log.Logger, registerer prometheus.Registerer, minStability featuregate.Stability) (Stage, error) {
stages := []StageConfig{
{
JSONConfig: &JSONConfig{
Expand Down Expand Up @@ -83,7 +84,7 @@ func NewDocker(logger log.Logger, registerer prometheus.Registerer) (Stage, erro
},
},
}
return NewPipeline(logger, stages, nil, registerer)
return NewPipeline(logger, stages, nil, registerer, minStability)
}

type cri struct {
Expand Down Expand Up @@ -169,7 +170,7 @@ func (c *cri) ensureTruncateIfRequired(e *Entry) {

// NewCRI creates a predefined pipeline for parsing entries in the CRI log
// format.
func NewCRI(logger log.Logger, config CRIConfig, registerer prometheus.Registerer) (Stage, error) {
func NewCRI(logger log.Logger, config CRIConfig, registerer prometheus.Registerer, minStability featuregate.Stability) (Stage, error) {
base := []StageConfig{
{
RegexConfig: &RegexConfig{
Expand Down Expand Up @@ -199,7 +200,7 @@ func NewCRI(logger log.Logger, config CRIConfig, registerer prometheus.Registere
},
}

p, err := NewPipeline(logger, base, nil, registerer)
p, err := NewPipeline(logger, base, nil, registerer, minStability)
if err != nil {
return nil, err
}
Expand Down
7 changes: 4 additions & 3 deletions internal/component/loki/process/stages/extensions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/alloy/internal/featuregate"
util_log "github.com/grafana/loki/v3/pkg/util/log"
)

Expand Down Expand Up @@ -68,7 +69,7 @@ func TestNewDocker(t *testing.T) {
tt := tt
t.Run(tName, func(t *testing.T) {
t.Parallel()
p, err := NewDocker(util_log.Logger, prometheus.DefaultRegisterer)
p, err := NewDocker(util_log.Logger, prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable)
if err != nil {
t.Fatalf("failed to create Docker parser: %s", err)
}
Expand Down Expand Up @@ -192,7 +193,7 @@ func TestCRI_tags(t *testing.T) {
MaxPartialLineSize: tt.maxPartialLineSize,
MaxPartialLineSizeTruncate: tt.maxPartialLineSizeTruncate,
}
p, err := NewCRI(util_log.Logger, cfg, prometheus.DefaultRegisterer)
p, err := NewCRI(util_log.Logger, cfg, prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable)
require.NoError(t, err)

got := make([]string, 0)
Expand Down Expand Up @@ -275,7 +276,7 @@ func TestNewCri(t *testing.T) {
t.Run(tName, func(t *testing.T) {
t.Parallel()
cfg := DefaultCRIConfig
p, err := NewCRI(util_log.Logger, cfg, prometheus.DefaultRegisterer)
p, err := NewCRI(util_log.Logger, cfg, prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable)
if err != nil {
t.Fatalf("failed to create CRI parser: %s", err)
}
Expand Down
5 changes: 3 additions & 2 deletions internal/component/loki/process/stages/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"

"github.com/grafana/alloy/internal/featuregate"
"github.com/grafana/alloy/internal/util"
"github.com/grafana/alloy/syntax"
)
Expand Down Expand Up @@ -79,7 +80,7 @@ func TestPipeline_JSON(t *testing.T) {
t.Run(testName, func(t *testing.T) {
t.Parallel()

pl, err := NewPipeline(logger, loadConfig(testData.config), nil, prometheus.DefaultRegisterer)
pl, err := NewPipeline(logger, loadConfig(testData.config), nil, prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable)
assert.NoError(t, err, "Expected pipeline creation to not result in error")
out := processEntries(pl, newEntry(nil, nil, testData.entry, time.Now()))[0]
assert.Equal(t, testData.expectedExtract, out.Extracted)
Expand Down Expand Up @@ -341,7 +342,7 @@ func TestJSONParser_Parse(t *testing.T) {
tt := tt
t.Run(tName, func(t *testing.T) {
t.Parallel()
p, err := New(logger, nil, tt.config, nil)
p, err := New(logger, nil, tt.config, nil, featuregate.StabilityGenerallyAvailable)
assert.NoError(t, err, "failed to create json parser: %s", err)
out := processEntries(p, newEntry(tt.extracted, nil, tt.entry, time.Now()))[0]

Expand Down
5 changes: 3 additions & 2 deletions internal/component/loki/process/stages/labels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"

"github.com/grafana/alloy/internal/featuregate"
util_log "github.com/grafana/loki/v3/pkg/util/log"
)

Expand Down Expand Up @@ -40,7 +41,7 @@ var testLabelsLogLineWithMissingKey = `
`

func TestLabelsPipeline_Labels(t *testing.T) {
pl, err := NewPipeline(util_log.Logger, loadConfig(testLabelsYaml), nil, prometheus.DefaultRegisterer)
pl, err := NewPipeline(util_log.Logger, loadConfig(testLabelsYaml), nil, prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable)
if err != nil {
t.Fatal(err)
}
Expand All @@ -57,7 +58,7 @@ func TestLabelsPipelineWithMissingKey_Labels(t *testing.T) {
var buf bytes.Buffer
w := log.NewSyncWriter(&buf)
logger := log.NewLogfmtLogger(w)
pl, err := NewPipeline(logger, loadConfig(testLabelsYaml), nil, prometheus.DefaultRegisterer)
pl, err := NewPipeline(logger, loadConfig(testLabelsYaml), nil, prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable)
if err != nil {
t.Fatal(err)
}
Expand Down
7 changes: 4 additions & 3 deletions internal/component/loki/process/stages/limit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/alloy/internal/featuregate"
util_log "github.com/grafana/loki/v3/pkg/util/log"
)

Expand Down Expand Up @@ -58,7 +59,7 @@ var plName = "testPipeline"
// TestLimitPipeline is used to verify we properly parse the yaml config and create a working pipeline
func TestLimitWaitPipeline(t *testing.T) {
registry := prometheus.NewRegistry()
pl, err := NewPipeline(util_log.Logger, loadConfig(testLimitWaitAlloy), &plName, registry)
pl, err := NewPipeline(util_log.Logger, loadConfig(testLimitWaitAlloy), &plName, registry, featuregate.StabilityGenerallyAvailable)
logs := make([]Entry, 0)
logCount := 5
for i := 0; i < logCount; i++ {
Expand All @@ -76,7 +77,7 @@ func TestLimitWaitPipeline(t *testing.T) {
// TestLimitPipeline is used to verify we properly parse the yaml config and create a working pipeline
func TestLimitDropPipeline(t *testing.T) {
registry := prometheus.NewRegistry()
pl, err := NewPipeline(util_log.Logger, loadConfig(testLimitDropAlloy), &plName, registry)
pl, err := NewPipeline(util_log.Logger, loadConfig(testLimitDropAlloy), &plName, registry, featuregate.StabilityGenerallyAvailable)
logs := make([]Entry, 0)
logCount := 10
for i := 0; i < logCount; i++ {
Expand All @@ -94,7 +95,7 @@ func TestLimitDropPipeline(t *testing.T) {
// TestLimitByLabelPipeline is used to verify we properly parse the yaml config and create a working pipeline
func TestLimitByLabelPipeline(t *testing.T) {
registry := prometheus.NewRegistry()
pl, err := NewPipeline(util_log.Logger, loadConfig(testLimitByLabelAlloy), &plName, registry)
pl, err := NewPipeline(util_log.Logger, loadConfig(testLimitByLabelAlloy), &plName, registry, featuregate.StabilityGenerallyAvailable)
logs := make([]Entry, 0)
logCount := 5
for i := 0; i < logCount; i++ {
Expand Down
Loading
Loading