diff --git a/CHANGELOG.md b/CHANGELOG.md index fe132e4052..4b314e4b76 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -50,6 +50,8 @@ Main (unreleased) - Bump snmp_exporter and embedded modules to 0.27.0. Add support for multi-module handling by comma separation and expose argument to increase SNMP polling concurrency for `prometheus.exporter.snmp`. (@v-zhuravlev) +- Reduce CPU usage of `loki.source.windowsevent` by up to 60% by updating the bookmark file every 10 seconds instead of after every event. (@wildum) + - Add support for pushv1.PusherService Connect API in `pyroscope.receive_http`. (@simonswine) - Add support for path prefixes in `pyroscope.scrape` to allow scraping targets behind a proxy or with custom URL paths. (@korniltsev) @@ -58,6 +60,7 @@ Main (unreleased) - Change profile handling in `pyroscope.receive_http` and `pyroscope.write` components to use in-memory processing instead of pipes. (@marcsanmi) + v1.6.1 ----------------- diff --git a/internal/component/loki/source/windowsevent/bookmark.go b/internal/component/loki/source/windowsevent/bookmark.go index c6882a4372..b18fa6a06e 100644 --- a/internal/component/loki/source/windowsevent/bookmark.go +++ b/internal/component/loki/source/windowsevent/bookmark.go @@ -1,7 +1,7 @@ //go:build windows // +build windows -// This code is copied from Promtail v1.6.2-0.20231004111112-07cbef92268a with minor changes. +// This code is adapted from loki/promtail. Last revision used to port changes to Alloy was v1.6.2-0.20231004111112-07cbef92268a. package windowsevent @@ -12,9 +12,10 @@ import ( "io/fs" "os" - "github.com/natefinch/atomic" + uberAtomic "go.uber.org/atomic" "github.com/grafana/loki/v3/clients/pkg/promtail/targets/windows/win_eventlog" + "github.com/natefinch/atomic" ) type bookMark struct { @@ -22,6 +23,8 @@ type bookMark struct { isNew bool path string buf []byte + + bookmarkStr *uberAtomic.String } // newBookMark creates a new windows event bookmark. @@ -33,19 +36,21 @@ func newBookMark(path string) (*bookMark, error) { _, err := os.Stat(path) // creates a new bookmark file if none exists. if errors.Is(err, fs.ErrNotExist) { - _, err := os.Create(path) + f, err := os.Create(path) if err != nil { return nil, err } + defer f.Close() bm, err := win_eventlog.CreateBookmark("") if err != nil { return nil, err } return &bookMark{ - handle: bm, - path: path, - isNew: true, - buf: buf, + handle: bm, + path: path, + isNew: true, + buf: buf, + bookmarkStr: uberAtomic.NewString(""), }, nil } if err != nil { @@ -74,18 +79,24 @@ func newBookMark(path string) (*bookMark, error) { } } return &bookMark{ - handle: bm, - path: path, - isNew: fileString == "", - buf: buf, + handle: bm, + path: path, + isNew: fileString == "", + buf: buf, + bookmarkStr: uberAtomic.NewString(""), }, nil } -// save Saves the bookmark at the current event position. -func (b *bookMark) save(event win_eventlog.EvtHandle) error { +func (b *bookMark) update(event win_eventlog.EvtHandle) error { newBookmark, err := win_eventlog.UpdateBookmark(b.handle, event, b.buf) if err != nil { return err } - return atomic.WriteFile(b.path, bytes.NewReader([]byte(newBookmark))) + b.bookmarkStr.Store(newBookmark) + return nil +} + +// save Saves the bookmark at the current event position. +func (b *bookMark) save() error { + return atomic.WriteFile(b.path, bytes.NewReader([]byte(b.bookmarkStr.Load()))) } diff --git a/internal/component/loki/source/windowsevent/component_test.go b/internal/component/loki/source/windowsevent/component_test.go index 8969a98bfb..2f3af0c534 100644 --- a/internal/component/loki/source/windowsevent/component_test.go +++ b/internal/component/loki/source/windowsevent/component_test.go @@ -16,10 +16,12 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" + "go.uber.org/goleak" "golang.org/x/sys/windows/svc/eventlog" ) func TestEventLogger(t *testing.T) { + defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start")) var loggerName = "alloy_test" //Setup Windows Event log with the log source name and logging levels _ = eventlog.InstallAsEventCreate(loggerName, eventlog.Info|eventlog.Warning|eventlog.Error) @@ -73,7 +75,7 @@ func TestEventLogger(t *testing.T) { } func TestLegacyBookmarkConversion(t *testing.T) { - + defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start")) bookmarkText := ` ` @@ -112,4 +114,10 @@ func TestLegacyBookmarkConversion(t *testing.T) { dd, _ := os.ReadFile(c.args.BookmarkPath) // The New function will convert via calling update. require.True(t, string(dd) == bookmarkText) + + // Run the component and cancel it to stop the target that was started in New() + ctx := context.Background() + ctx, cancelFunc := context.WithTimeout(ctx, 10*time.Second) + go c.Run(ctx) + cancelFunc() } diff --git a/internal/component/loki/source/windowsevent/component_windows.go b/internal/component/loki/source/windowsevent/component_windows.go index 2d65e7ec74..0dae7c393e 100644 --- a/internal/component/loki/source/windowsevent/component_windows.go +++ b/internal/component/loki/source/windowsevent/component_windows.go @@ -5,6 +5,7 @@ import ( "os" "path" "sync" + "time" "github.com/grafana/loki/v3/clients/pkg/promtail/api" "github.com/grafana/loki/v3/clients/pkg/promtail/scrapeconfig" @@ -116,10 +117,13 @@ func (c *Component) Update(args component.Arguments) error { return err } - winTarget, err := NewTarget(c.opts.Logger, c.handle, nil, convertConfig(newArgs)) + // Same as the loki.source.file sync position period + bookmarkSyncPeriod := 10 * time.Second + winTarget, err := NewTarget(c.opts.Logger, c.handle, nil, convertConfig(newArgs), bookmarkSyncPeriod) if err != nil { return err } + // Stop the original target. if c.target != nil { err := c.target.Stop() diff --git a/internal/component/loki/source/windowsevent/target.go b/internal/component/loki/source/windowsevent/target.go index d18ad47115..52eafe97b1 100644 --- a/internal/component/loki/source/windowsevent/target.go +++ b/internal/component/loki/source/windowsevent/target.go @@ -1,7 +1,7 @@ //go:build windows // +build windows -// This code is copied from Promtail v1.6.2-0.20231004111112-07cbef92268a with minor changes. +// This code is adapted from loki/promtail. Last revision used to port changes to Alloy was v1.6.2-0.20231004111112-07cbef92268a. package windowsevent @@ -49,6 +49,7 @@ func NewTarget( handler api.EntryHandler, relabel []*relabel.Config, cfg *scrapeconfig.WindowsEventsTargetConfig, + bookmarkSyncPeriod time.Duration, ) (*Target, error) { sigEvent, err := windows.CreateEvent(nil, 0, 0, nil) if err != nil { @@ -91,6 +92,7 @@ func NewTarget( t.cfg.PollInterval = 3 * time.Second } go t.loop() + go t.updateBookmark(bookmarkSyncPeriod) return t, nil } @@ -120,15 +122,17 @@ func (t *Target) loop() { } t.err = nil // we have received events to handle. - for i, entry := range t.renderEntries(events) { + for _, entry := range t.renderEntries(events) { t.handler.Chan() <- entry - if err := t.bm.save(handles[i]); err != nil { + } + if len(handles) != 0 { + err = t.bm.update(handles[len(handles)-1]) + if err != nil { t.err = err - level.Error(util_log.Logger).Log("msg", "error saving bookmark", "err", err) + level.Error(util_log.Logger).Log("msg", "error updating in-memory bookmark", "err", err) } } win_eventlog.Close(handles) - } // no more messages we wait for next poll timer tick. select { @@ -139,6 +143,32 @@ func (t *Target) loop() { } } +func (t *Target) updateBookmark(bookmarkSyncPeriod time.Duration) { + t.wg.Add(1) + + bookmarkTick := time.NewTicker(bookmarkSyncPeriod) + defer func() { + bookmarkTick.Stop() + t.wg.Done() + }() + + for { + select { + case <-bookmarkTick.C: + t.saveBookmarkPosition() + case <-t.done: + return + } + } +} + +func (t *Target) saveBookmarkPosition() { + if err := t.bm.save(); err != nil { + t.err = err + level.Error(util_log.Logger).Log("msg", "error saving bookmark", "err", err) + } +} + // renderEntries renders Loki entries from windows event logs func (t *Target) renderEntries(events []win_eventlog.Event) []api.Entry { res := make([]api.Entry, 0, len(events)) @@ -226,5 +256,6 @@ func (t *Target) Stop() error { close(t.done) t.wg.Wait() t.handler.Stop() + t.saveBookmarkPosition() return t.err } diff --git a/internal/component/loki/source/windowsevent/target_test.go b/internal/component/loki/source/windowsevent/target_test.go new file mode 100644 index 0000000000..074bcb9f61 --- /dev/null +++ b/internal/component/loki/source/windowsevent/target_test.go @@ -0,0 +1,70 @@ +//go:build windows + +package windowsevent + +import ( + "os" + "path" + "testing" + "time" + + "github.com/go-kit/kit/log" + "github.com/grafana/alloy/internal/component/common/loki/utils" + "github.com/grafana/loki/v3/clients/pkg/promtail/api" + "github.com/grafana/loki/v3/clients/pkg/promtail/scrapeconfig" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + "go.uber.org/goleak" + "golang.org/x/sys/windows/svc/eventlog" +) + +func TestBookmarkUpdate(t *testing.T) { + defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start")) + + var loggerName = "alloy_test" + _ = eventlog.InstallAsEventCreate(loggerName, eventlog.Info|eventlog.Warning|eventlog.Error) + wlog, err := eventlog.Open(loggerName) + require.NoError(t, err) + + dirPath := "bookmarktest" + filePath := path.Join(dirPath, "bookmark.xml") + require.NoError(t, os.MkdirAll(path.Dir(filePath), 700)) + defer func() { + require.NoError(t, os.RemoveAll(dirPath)) + }() + + scrapeConfig := &scrapeconfig.WindowsEventsTargetConfig{ + Locale: 0, + EventlogName: "Application", + Query: "*", + UseIncomingTimestamp: false, + BookmarkPath: filePath, + PollInterval: 10 * time.Millisecond, + ExcludeEventData: false, + ExcludeEventMessage: false, + ExcludeUserData: false, + Labels: utils.ToLabelSet(map[string]string{"job": "windows"}), + } + handle := &handler{handler: make(chan api.Entry)} + winTarget, err := NewTarget(log.NewLogfmtLogger(os.Stderr), handle, nil, scrapeConfig, 1000*time.Millisecond) + require.NoError(t, err) + + tm := time.Now().Format(time.RFC3339Nano) + err = wlog.Info(2, tm) + require.NoError(t, err) + + select { + case e := <-handle.handler: + require.Equal(t, model.LabelValue("windows"), e.Labels["job"]) + case <-time.After(3 * time.Second): + require.FailNow(t, "failed waiting for event") + } + winTarget.Stop() + + require.NoError(t, wlog.Close()) + + content, err := os.ReadFile(filePath) + require.NoError(t, err) + // check that only the start because the RecordId changes + require.Contains(t, string(content), "\r\n