From bf659c048cc4aa6e5c01e90eb6ed3f4fcc7f06f6 Mon Sep 17 00:00:00 2001 From: William Dumont Date: Mon, 13 Jan 2025 15:05:17 +0100 Subject: [PATCH 1/6] use runner pkg in loki source file --- CHANGELOG.md | 4 + .../loki/source/file/decompresser.go | 82 ++++--- .../loki/source/file/decompresser_test.go | 137 ++++++++--- internal/component/loki/source/file/file.go | 162 +++++-------- .../component/loki/source/file/file_test.go | 66 ++++++ internal/component/loki/source/file/reader.go | 2 +- internal/component/loki/source/file/runner.go | 66 ++++++ internal/component/loki/source/file/tailer.go | 219 +++++++++--------- .../component/loki/source/file/tailer_test.go | 91 ++++++++ 9 files changed, 543 insertions(+), 286 deletions(-) create mode 100644 internal/component/loki/source/file/runner.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 827e29b14e..27733c9d20 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,10 @@ Main (unreleased) - Add json format support for log export via faro receiver (@ravishankar15) +### Bugfixes + +- Fix log rotation for Windows in `loki.source.file` by refactoring the component to use the runner pkg. This should also reduce CPU consumption when tailing a lot of files in a dynamic environment. (@wildum) + v1.6.0-rc.1 ----------------- diff --git a/internal/component/loki/source/file/decompresser.go b/internal/component/loki/source/file/decompresser.go index f9dd45d4ad..7b000dfd72 100644 --- a/internal/component/loki/source/file/decompresser.go +++ b/internal/component/loki/source/file/decompresser.go @@ -1,6 +1,6 @@ package file -// This code is copied from loki/promtail@a8d5815510bd959a6dd8c176a5d9fd9bbfc8f8b5. +// This code is adapted from loki/promtail@a8d5815510bd959a6dd8c176a5d9fd9bbfc8f8b5. // Decompressor implements the reader interface and is used to read compressed log files. // It uses the Go stdlib's compress/* packages for decoding. @@ -43,19 +43,19 @@ func supportedCompressedFormats() map[string]struct{} { type decompressor struct { metrics *metrics logger log.Logger - handler loki.EntryHandler + receiver loki.LogsReceiver positions positions.Positions - path string - labels string + path string + labels model.LabelSet + labelsStr string posAndSizeMtx sync.Mutex - stopOnce sync.Once running *atomic.Bool - posquit chan struct{} - posdone chan struct{} - done chan struct{} + posquit chan struct{} // used by the readLine method to tell the updatePosition method to stop + posdone chan struct{} // used by the updatePosition method to notify when it stopped + done chan struct{} // used by the readLine method to notify when it stopped decoder *encoding.Decoder @@ -67,17 +67,19 @@ type decompressor struct { func newDecompressor( metrics *metrics, logger log.Logger, - handler loki.EntryHandler, + receiver loki.LogsReceiver, positions positions.Positions, path string, - labels string, + labels model.LabelSet, encodingFormat string, cfg DecompressionConfig, ) (*decompressor, error) { + labelsStr := labels.String() + logger = log.With(logger, "component", "decompressor") - pos, err := positions.Get(path, labels) + pos, err := positions.Get(path, labelsStr) if err != nil { return nil, fmt.Errorf("failed to get positions: %w", err) } @@ -95,10 +97,11 @@ func newDecompressor( decompressor := &decompressor{ metrics: metrics, logger: logger, - handler: loki.AddLabelsMiddleware(model.LabelSet{filenameLabel: model.LabelValue(path)}).Wrap(handler), + receiver: receiver, positions: positions, path: path, labels: labels, + labelsStr: labelsStr, running: atomic.NewBool(false), posquit: make(chan struct{}), posdone: make(chan struct{}), @@ -108,9 +111,6 @@ func newDecompressor( cfg: cfg, } - go decompressor.readLines() - go decompressor.updatePosition() - metrics.filesActive.Add(1.) return decompressor, nil } @@ -151,6 +151,18 @@ func mountReader(f *os.File, logger log.Logger, format CompressionFormat) (reade return reader, nil } +func (d *decompressor) Run() { + labelsMiddleware := d.labels.Merge(model.LabelSet{filenameLabel: model.LabelValue(d.path)}) + handler := loki.AddLabelsMiddleware(labelsMiddleware).Wrap(loki.NewEntryHandler(d.receiver.Chan(), func() {})) + defer handler.Stop() + d.posquit = make(chan struct{}) + d.posdone = make(chan struct{}) + d.done = make(chan struct{}) + go d.updatePosition() + d.metrics.filesActive.Add(1.) + d.readLines(handler) +} + func (d *decompressor) updatePosition() { positionSyncPeriod := d.positions.SyncPeriod() positionWait := time.NewTicker(positionSyncPeriod) @@ -163,7 +175,7 @@ func (d *decompressor) updatePosition() { for { select { case <-positionWait.C: - if err := d.MarkPositionAndSize(); err != nil { + if err := d.markPositionAndSize(); err != nil { level.Error(d.logger).Log("msg", "position timer: error getting position and/or size, stopping decompressor", "path", d.path, "error", err) return } @@ -178,7 +190,7 @@ func (d *decompressor) updatePosition() { // It first decompresses the file as a whole using a reader and then it will iterate // over its chunks, separated by '\n'. // During each iteration, the parsed and decoded log line is then sent to the API with the current timestamp. -func (d *decompressor) readLines() { +func (d *decompressor) readLines(handler loki.EntryHandler) { level.Info(d.logger).Log("msg", "read lines routine: started", "path", d.path) d.running.Store(true) @@ -188,11 +200,12 @@ func (d *decompressor) readLines() { } defer func() { + d.running.Store(false) d.cleanupMetrics() level.Info(d.logger).Log("msg", "read lines routine finished", "path", d.path) close(d.done) }() - entries := d.handler.Chan() + entries := handler.Chan() f, err := os.Open(d.path) if err != nil { @@ -261,36 +274,31 @@ func (d *decompressor) readLines() { } } -func (d *decompressor) MarkPositionAndSize() error { - // Lock this update as there are 2 timers calling this routine, the sync in filetarget and the positions sync in this file. +func (d *decompressor) markPositionAndSize() error { + // Lock this update because it can be called in two different threads d.posAndSizeMtx.Lock() defer d.posAndSizeMtx.Unlock() d.metrics.totalBytes.WithLabelValues(d.path).Set(float64(d.size)) d.metrics.readBytes.WithLabelValues(d.path).Set(float64(d.position)) - d.positions.Put(d.path, d.labels, d.position) + d.positions.Put(d.path, d.labelsStr, d.position) return nil } func (d *decompressor) Stop() { - // stop can be called by two separate threads in filetarget, to avoid a panic closing channels more than once - // we wrap the stop in a sync.Once. - d.stopOnce.Do(func() { - // Shut down the position marker thread - close(d.posquit) - <-d.posdone - - // Save the current position before shutting down reader - if err := d.MarkPositionAndSize(); err != nil { - level.Error(d.logger).Log("msg", "error marking file position when stopping decompressor", "path", d.path, "error", err) - } + // Shut down the position marker thread + close(d.posquit) + <-d.posdone + + // Save the current position before shutting down reader + if err := d.markPositionAndSize(); err != nil { + level.Error(d.logger).Log("msg", "error marking file position when stopping decompressor", "path", d.path, "error", err) + } - // Wait for readLines() to consume all the remaining messages and exit when the channel is closed - <-d.done - level.Info(d.logger).Log("msg", "stopped decompressor", "path", d.path) - d.handler.Stop() - }) + // Wait for readLines() to consume all the remaining messages and exit when the channel is closed + <-d.done + level.Info(d.logger).Log("msg", "stopped decompressor", "path", d.path) } func (d *decompressor) IsRunning() bool { diff --git a/internal/component/loki/source/file/decompresser_test.go b/internal/component/loki/source/file/decompresser_test.go index 204fa55a0e..3b892f0164 100644 --- a/internal/component/loki/source/file/decompresser_test.go +++ b/internal/component/loki/source/file/decompresser_test.go @@ -5,17 +5,23 @@ package file import ( "os" + "path/filepath" "sync" "testing" "time" "github.com/grafana/alloy/internal/component/common/loki/client/fake" + "github.com/grafana/alloy/internal/component/common/loki/positions" + "github.com/grafana/alloy/internal/util" "github.com/go-kit/log" "github.com/grafana/alloy/internal/component/common/loki" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/atomic" + "go.uber.org/goleak" ) type noopClient struct { @@ -64,17 +70,17 @@ func BenchmarkReadlines(b *testing.B) { for _, tc := range scenarios { b.Run(tc.name, func(b *testing.B) { decBase := &decompressor{ - logger: log.NewNopLogger(), - running: atomic.NewBool(false), - handler: entryHandler, - path: tc.file, + logger: log.NewNopLogger(), + running: atomic.NewBool(false), + receiver: loki.NewLogsReceiver(), + path: tc.file, } for i := 0; i < b.N; i++ { newDec := decBase newDec.metrics = newMetrics(prometheus.NewRegistry()) newDec.done = make(chan struct{}) - newDec.readLines() + newDec.readLines(entryHandler) <-newDec.done } }) @@ -87,16 +93,16 @@ func TestGigantiqueGunzipFile(t *testing.T) { defer handler.Stop() d := &decompressor{ - logger: log.NewNopLogger(), - running: atomic.NewBool(false), - handler: handler, - path: file, - done: make(chan struct{}), - metrics: newMetrics(prometheus.NewRegistry()), - cfg: DecompressionConfig{Format: "gz"}, + logger: log.NewNopLogger(), + running: atomic.NewBool(false), + receiver: loki.NewLogsReceiver(), + path: file, + done: make(chan struct{}), + metrics: newMetrics(prometheus.NewRegistry()), + cfg: DecompressionConfig{Format: "gz"}, } - d.readLines() + d.readLines(handler) <-d.done time.Sleep(time.Millisecond * 200) @@ -117,16 +123,16 @@ func TestOnelineFiles(t *testing.T) { defer handler.Stop() d := &decompressor{ - logger: log.NewNopLogger(), - running: atomic.NewBool(false), - handler: handler, - path: file, - done: make(chan struct{}), - metrics: newMetrics(prometheus.NewRegistry()), - cfg: DecompressionConfig{Format: "gz"}, + logger: log.NewNopLogger(), + running: atomic.NewBool(false), + receiver: loki.NewLogsReceiver(), + path: file, + done: make(chan struct{}), + metrics: newMetrics(prometheus.NewRegistry()), + cfg: DecompressionConfig{Format: "gz"}, } - d.readLines() + d.readLines(handler) <-d.done time.Sleep(time.Millisecond * 200) @@ -142,16 +148,16 @@ func TestOnelineFiles(t *testing.T) { defer handler.Stop() d := &decompressor{ - logger: log.NewNopLogger(), - running: atomic.NewBool(false), - handler: handler, - path: file, - done: make(chan struct{}), - metrics: newMetrics(prometheus.NewRegistry()), - cfg: DecompressionConfig{Format: "bz2"}, + logger: log.NewNopLogger(), + running: atomic.NewBool(false), + receiver: loki.NewLogsReceiver(), + path: file, + done: make(chan struct{}), + metrics: newMetrics(prometheus.NewRegistry()), + cfg: DecompressionConfig{Format: "bz2"}, } - d.readLines() + d.readLines(handler) <-d.done time.Sleep(time.Millisecond * 200) @@ -167,16 +173,16 @@ func TestOnelineFiles(t *testing.T) { defer handler.Stop() d := &decompressor{ - logger: log.NewNopLogger(), - running: atomic.NewBool(false), - handler: handler, - path: file, - done: make(chan struct{}), - metrics: newMetrics(prometheus.NewRegistry()), - cfg: DecompressionConfig{Format: "gz"}, + logger: log.NewNopLogger(), + running: atomic.NewBool(false), + receiver: loki.NewLogsReceiver(), + path: file, + done: make(chan struct{}), + metrics: newMetrics(prometheus.NewRegistry()), + cfg: DecompressionConfig{Format: "gz"}, } - d.readLines() + d.readLines(handler) <-d.done time.Sleep(time.Millisecond * 200) @@ -188,3 +194,60 @@ func TestOnelineFiles(t *testing.T) { require.Contains(t, firstEntry.Line, `5.202.214.160 - - [26/Jan/2019:19:45:25 +0330] "GET / HTTP/1.1" 200 30975 "https://www.zanbil.ir/" "Mozilla/5.0 (Windows NT 6.2; WOW64; rv:21.0) Gecko/20100101 Firefox/21.0" "-"`) }) } + +func TestDecompressor(t *testing.T) { + defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start")) + l := util.TestLogger(t) + ch1 := loki.NewLogsReceiver() + tempDir := t.TempDir() + positionsFile, err := positions.New(l, positions.Config{ + SyncPeriod: 50 * time.Millisecond, + PositionsFile: filepath.Join(tempDir, "positions.yaml"), + IgnoreInvalidYaml: false, + ReadOnly: false, + }) + require.NoError(t, err) + filename := "testdata/onelinelog.tar.gz" + labels := model.LabelSet{ + "filename": model.LabelValue(filename), + "foo": "bar", + } + decompressor, err := newDecompressor( + newMetrics(nil), + l, + ch1, + positionsFile, + filename, + labels, + "", + DecompressionConfig{Format: "gz"}, + ) + go decompressor.Run() + + select { + case logEntry := <-ch1.Chan(): + require.Contains(t, logEntry.Line, "onelinelog.log") + case <-time.After(1 * time.Second): + require.FailNow(t, "failed waiting for log line") + } + + require.EventuallyWithT(t, func(c *assert.CollectT) { + pos, err := positionsFile.Get(filename, labels.String()) + assert.NoError(c, err) + assert.Equal(c, int64(1), pos) + }, time.Second, 50*time.Millisecond) + + decompressor.Stop() + + // Run the decompressor again + go decompressor.Run() + select { + case <-ch1.Chan(): + t.Fatal("no message should be sent because of the position file") + case <-time.After(1 * time.Second): + } + + decompressor.Stop() + + positionsFile.Stop() +} diff --git a/internal/component/loki/source/file/file.go b/internal/component/loki/source/file/file.go index dc1222e1fd..e1ca7a3063 100644 --- a/internal/component/loki/source/file/file.go +++ b/internal/component/loki/source/file/file.go @@ -14,6 +14,7 @@ import ( "github.com/grafana/alloy/internal/component/common/loki/positions" "github.com/grafana/alloy/internal/component/discovery" "github.com/grafana/alloy/internal/featuregate" + "github.com/grafana/alloy/internal/runner" "github.com/grafana/alloy/internal/runtime/logging/level" "github.com/grafana/tail/watch" "github.com/prometheus/common/model" @@ -85,7 +86,9 @@ type Component struct { handler loki.LogsReceiver receivers []loki.LogsReceiver posFile positions.Positions - readers map[positions.Entry]reader + tasks map[positions.Entry]runnerTask + + updateReaders chan struct{} } // New creates a new loki.source.file component. @@ -113,10 +116,11 @@ func New(o component.Options, args Arguments) (*Component, error) { opts: o, metrics: newMetrics(o.Registerer), - handler: loki.NewLogsReceiver(), - receivers: args.ForwardTo, - posFile: positionsFile, - readers: make(map[positions.Entry]reader), + handler: loki.NewLogsReceiver(), + receivers: args.ForwardTo, + posFile: positionsFile, + tasks: make(map[positions.Entry]runnerTask), + updateReaders: make(chan struct{}, 1), } // Call to Update() to start readers and set receivers once at the start. @@ -128,16 +132,16 @@ func New(o component.Options, args Arguments) (*Component, error) { } // Run implements component.Component. -// TODO(@tpaschalis). Should we periodically re-check? What happens if a target -// comes alive _after_ it's been passed to us and we never receive another -// Update()? Or should it be a responsibility of the discovery component? func (c *Component) Run(ctx context.Context) error { + runner := runner.New(func(t *runnerTask) runner.Worker { + return &runnerReader{ + reader: t.reader, + } + }) defer func() { level.Info(c.opts.Logger).Log("msg", "loki.source.file component shutting down, stopping readers and positions file") c.mut.RLock() - for _, r := range c.readers { - r.Stop() - } + runner.Stop() c.posFile.Stop() close(c.handler.Chan()) c.mut.RUnlock() @@ -153,6 +157,14 @@ func (c *Component) Run(ctx context.Context) error { receiver.Chan() <- entry } c.mut.RUnlock() + case <-c.updateReaders: + c.mut.Lock() + var tasks []*runnerTask + for _, entry := range c.tasks { + tasks = append(tasks, &entry) + } + runner.ApplyTasks(ctx, tasks) + c.mut.Unlock() } } } @@ -162,28 +174,6 @@ func (c *Component) Update(args component.Arguments) error { c.updateMut.Lock() defer c.updateMut.Unlock() - // Stop all readers so we can recreate them below. This *must* be done before - // c.mut is held to avoid a race condition where stopping a reader is - // flushing its data, but the flush never succeeds because the Run goroutine - // fails to get a read lock. - // - // Stopping the readers avoids the issue we saw with stranded wrapped - // handlers staying behind until they were GC'ed and sending duplicate - // message to the global handler. It also makes sure that we update - // everything with the new labels. Simply zeroing out the c.readers map did - // not work correctly to shut down the wrapped handlers in time. - // - // TODO (@tpaschalis) We should be able to optimize this somehow and eg. - // cache readers for paths we already know about, and whose labels have not - // changed. Once we do that we should: - // - // * Call to c.pruneStoppedReaders to give cached but errored readers a - // chance to restart. - // * Stop tailing any files that were no longer in the new targets - // and conditionally remove their readers only by calling toStopTailing - // and c.stopTailingAndRemovePosition. - oldPaths := c.stopReaders() - newArgs := args.(Arguments) c.mut.Lock() @@ -191,11 +181,10 @@ func (c *Component) Update(args component.Arguments) error { c.args = newArgs c.receivers = newArgs.ForwardTo - c.readers = make(map[positions.Entry]reader) + c.tasks = make(map[positions.Entry]runnerTask) if len(newArgs.Targets) == 0 { level.Debug(c.opts.Logger).Log("msg", "no files targets were passed, nothing will be tailed") - return nil } for _, target := range newArgs.Targets { @@ -211,72 +200,47 @@ func (c *Component) Update(args component.Arguments) error { // Deduplicate targets which have the same public label set. readersKey := positions.Entry{Path: path, Labels: labels.String()} - if _, exist := c.readers[readersKey]; exist { + if _, exist := c.tasks[readersKey]; exist { continue } - c.reportSize(path, labels.String()) + c.reportSize(path) - handler := loki.AddLabelsMiddleware(labels).Wrap(loki.NewEntryHandler(c.handler.Chan(), func() {})) - reader, err := c.startTailing(path, labels, handler) + reader, err := c.createReader(path, labels) if err != nil { continue } - c.readers[readersKey] = readerWithHandler{ - reader: reader, - handler: handler, + c.tasks[readersKey] = runnerTask{ + reader: reader, + path: path, + labels: labels.String(), + // Could fastFingerPrint work? + readerHash: uint64(labels.Merge(model.LabelSet{filenameLabel: model.LabelValue(path)}).Fingerprint()), } } - // Remove from the positions file any entries that had a Reader before, but - // are no longer in the updated set of Targets. - for r := range missing(c.readers, oldPaths) { - c.posFile.Remove(r.Path, r.Labels) + select { + case c.updateReaders <- struct{}{}: + default: } return nil } -// readerWithHandler combines a reader with an entry handler associated with -// it. Closing the reader will also close the handler. -type readerWithHandler struct { - reader - handler loki.EntryHandler -} - -func (r readerWithHandler) Stop() { - r.reader.Stop() - r.handler.Stop() -} - -// stopReaders stops existing readers and returns the set of paths which were -// stopped. -func (c *Component) stopReaders() map[positions.Entry]struct{} { - c.mut.RLock() - defer c.mut.RUnlock() - - stoppedPaths := make(map[positions.Entry]struct{}, len(c.readers)) - - for p, r := range c.readers { - stoppedPaths[p] = struct{}{} - r.Stop() - } - - return stoppedPaths -} - // DebugInfo returns information about the status of tailed targets. // TODO(@tpaschalis) Decorate with more debug information once it's made // available, such as the last time a log line was read. func (c *Component) DebugInfo() interface{} { + c.mut.Lock() + defer c.mut.Unlock() var res readerDebugInfo - for e, reader := range c.readers { + for e, task := range c.tasks { offset, _ := c.posFile.Get(e.Path, e.Labels) res.TargetsInfo = append(res.TargetsInfo, targetInfo{ Path: e.Path, Labels: e.Labels, - IsRunning: reader.IsRunning(), + IsRunning: task.reader.IsRunning(), ReadOffset: offset, }) } @@ -305,10 +269,9 @@ func missing(as map[positions.Entry]reader, bs map[positions.Entry]struct{}) map return c } -// startTailing starts and returns a reader for the given path. For most files, -// this will be a tailer implementation. If the file suffix alludes to it being -// a compressed file, then a decompressor will be started instead. -func (c *Component) startTailing(path string, labels model.LabelSet, handler loki.EntryHandler) (reader, error) { +// For most files, createReader returns a tailer implementation. If the file suffix alludes to it being +// a compressed file, then a decompressor will be created instead. +func (c *Component) createReader(path string, labels model.LabelSet) (reader, error) { fi, err := os.Stat(path) if err != nil { level.Error(c.opts.Logger).Log("msg", "failed to tail file, stat failed", "error", err, "filename", path) @@ -324,24 +287,22 @@ func (c *Component) startTailing(path string, labels model.LabelSet, handler lok var reader reader if c.args.DecompressionConfig.Enabled { - level.Debug(c.opts.Logger).Log("msg", "reading from compressed file", "filename", path) decompressor, err := newDecompressor( c.metrics, c.opts.Logger, - handler, + c.handler, c.posFile, path, - labels.String(), + labels, c.args.Encoding, c.args.DecompressionConfig, ) if err != nil { - level.Error(c.opts.Logger).Log("msg", "failed to start decompressor", "error", err, "filename", path) - return nil, fmt.Errorf("failed to start decompressor %s", err) + level.Error(c.opts.Logger).Log("msg", "failed to create decompressor", "error", err, "filename", path) + return nil, fmt.Errorf("failed to create decompressor %s", err) } reader = decompressor } else { - level.Debug(c.opts.Logger).Log("msg", "tailing new file", "filename", path) pollOptions := watch.PollingFileWatcherOptions{ MinPollFrequency: c.args.FileWatch.MinPollFrequency, MaxPollFrequency: c.args.FileWatch.MaxPollFrequency, @@ -349,17 +310,17 @@ func (c *Component) startTailing(path string, labels model.LabelSet, handler lok tailer, err := newTailer( c.metrics, c.opts.Logger, - handler, + c.handler, c.posFile, path, - labels.String(), + labels, c.args.Encoding, pollOptions, c.args.TailFromEnd, ) if err != nil { - level.Error(c.opts.Logger).Log("msg", "failed to start tailer", "error", err, "filename", path) - return nil, fmt.Errorf("failed to start tailer %s", err) + level.Error(c.opts.Logger).Log("msg", "failed to create tailer", "error", err, "filename", path) + return nil, fmt.Errorf("failed to create tailer %s", err) } reader = tailer } @@ -367,21 +328,10 @@ func (c *Component) startTailing(path string, labels model.LabelSet, handler lok return reader, nil } -func (c *Component) reportSize(path, labels string) { - // Ask the reader to update the size if a reader exists, this keeps - // position and size metrics in sync. - if reader, ok := c.readers[positions.Entry{Path: path, Labels: labels}]; ok { - err := reader.MarkPositionAndSize() - if err != nil { - level.Warn(c.opts.Logger).Log("msg", "failed to get file size from existing reader, ", "file", path, "error", err) - return - } - } else { - // Must be a new file, just directly read the size of it - fi, err := os.Stat(path) - if err != nil { - return - } - c.metrics.totalBytes.WithLabelValues(path).Set(float64(fi.Size())) +func (c *Component) reportSize(path string) { + fi, err := os.Stat(path) + if err != nil { + return } + c.metrics.totalBytes.WithLabelValues(path).Set(float64(fi.Size())) } diff --git a/internal/component/loki/source/file/file_test.go b/internal/component/loki/source/file/file_test.go index fcd418b6f1..643f2d915c 100644 --- a/internal/component/loki/source/file/file_test.go +++ b/internal/component/loki/source/file/file_test.go @@ -314,3 +314,69 @@ func TestEncoding(t *testing.T) { "expected positions.yml file to be written eventually", ) } + +func TestDeleteRecreateFile(t *testing.T) { + defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start")) + + filename := "example" + + ctx, cancel := context.WithCancel(componenttest.TestContext(t)) + defer cancel() + + // Create file to log to. + f, err := os.Create(filename) + require.NoError(t, err) + + ctrl, err := componenttest.NewControllerFromID(util.TestLogger(t), "loki.source.file") + require.NoError(t, err) + + ch1 := loki.NewLogsReceiver() + + go func() { + err := ctrl.Run(ctx, Arguments{ + Targets: []discovery.Target{{ + "__path__": f.Name(), + "foo": "bar", + }}, + ForwardTo: []loki.LogsReceiver{ch1}, + }) + require.NoError(t, err) + }() + + ctrl.WaitRunning(time.Minute) + + _, err = f.Write([]byte("writing some text\n")) + require.NoError(t, err) + + wantLabelSet := model.LabelSet{ + "filename": model.LabelValue(f.Name()), + "foo": "bar", + } + + checkMsg(t, ch1, "writing some text", 5*time.Second, wantLabelSet) + + require.NoError(t, f.Close()) + require.NoError(t, os.Remove(f.Name())) + + // Create a file with the same name + f, err = os.Create(filename) + require.NoError(t, err) + defer os.Remove(f.Name()) + defer f.Close() + + _, err = f.Write([]byte("writing some new text\n")) + require.NoError(t, err) + + checkMsg(t, ch1, "writing some new text", 5*time.Second, wantLabelSet) +} + +func checkMsg(t *testing.T, ch loki.LogsReceiver, msg string, timeout time.Duration, labelSet model.LabelSet) { + select { + case logEntry := <-ch.Chan(): + require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second) + require.Equal(t, msg, logEntry.Line) + require.Equal(t, labelSet, logEntry.Labels) + case <-time.After(timeout): + require.FailNow(t, "failed waiting for log line") + } +} diff --git a/internal/component/loki/source/file/reader.go b/internal/component/loki/source/file/reader.go index 04016dc6c1..6e1703cd3a 100644 --- a/internal/component/loki/source/file/reader.go +++ b/internal/component/loki/source/file/reader.go @@ -5,8 +5,8 @@ package file // reader contains the set of methods the loki.source.file component uses. type reader interface { + Run() Stop() IsRunning() bool Path() string - MarkPositionAndSize() error } diff --git a/internal/component/loki/source/file/runner.go b/internal/component/loki/source/file/runner.go new file mode 100644 index 0000000000..f0e81859bf --- /dev/null +++ b/internal/component/loki/source/file/runner.go @@ -0,0 +1,66 @@ +package file + +import ( + "context" + "time" + + "github.com/grafana/alloy/internal/runner" + "github.com/grafana/dskit/backoff" +) + +var _ runner.Task = (*runnerTask)(nil) + +type runnerTask struct { + reader reader + path string + labels string + readerHash uint64 +} + +func (r *runnerTask) Hash() uint64 { + return r.readerHash +} + +func (r *runnerTask) Equals(other runner.Task) bool { + otherTask := other.(*runnerTask) + + if r == otherTask { + return true + } + + return r.readerHash == otherTask.readerHash +} + +// runnerReader is a wrapper around a reader (tailer or decompressor) +// It is responsible for running the runner. If the reader stops running, +// it will retry it after a few seconds. This is useful to handle log file rotation +// when a file might be gone for a very short amount of time. +// The runner is only stopped when the corresponding target is gone or when the component is stopped. +type runnerReader struct { + reader reader +} + +func (r *runnerReader) Run(ctx context.Context) { + backoff := backoff.New( + ctx, + backoff.Config{ + MinBackoff: 1 * time.Second, + MaxBackoff: 10 * time.Second, + MaxRetries: 0, + }, + ) + + go func() { + <-ctx.Done() + r.reader.Stop() + }() + + for { + r.reader.Run() + backoff.Wait() + if !backoff.Ongoing() { + break + } + } + +} diff --git a/internal/component/loki/source/file/tailer.go b/internal/component/loki/source/file/tailer.go index d925d2ec5c..8f1cde1aa0 100644 --- a/internal/component/loki/source/file/tailer.go +++ b/internal/component/loki/source/file/tailer.go @@ -1,6 +1,6 @@ package file -// This code is copied from loki/promtail@a8d5815510bd959a6dd8c176a5d9fd9bbfc8f8b5. +// This code is adapted from loki/promtail@a8d5815510bd959a6dd8c176a5d9fd9bbfc8f8b5. // tailer implements the reader interface by using the github.com/grafana/tail package to tail files. import ( @@ -31,81 +31,43 @@ import ( type tailer struct { metrics *metrics logger log.Logger - handler loki.EntryHandler + receiver loki.LogsReceiver positions positions.Positions - path string - labels string - tail *tail.Tail + path string + labelsStr string + labels model.LabelSet + tail *tail.Tail + tailFromEnd bool + pollOptions watch.PollingFileWatcherOptions posAndSizeMtx sync.Mutex - stopOnce sync.Once running *atomic.Bool - posquit chan struct{} - posdone chan struct{} - done chan struct{} + posquit chan struct{} // used by the readLine method to tell the updatePosition method to stop + posdone chan struct{} // used by the updatePosition method to notify when it stopped + done chan struct{} // used by the readLine method to notify when it stopped decoder *encoding.Decoder } -func newTailer(metrics *metrics, logger log.Logger, handler loki.EntryHandler, positions positions.Positions, path string, - labels string, encoding string, pollOptions watch.PollingFileWatcherOptions, tailFromEnd bool) (*tailer, error) { - // Simple check to make sure the file we are tailing doesn't - // have a position already saved which is past the end of the file. - fi, err := os.Stat(path) - if err != nil { - return nil, err - } - pos, err := positions.Get(path, labels) - if err != nil { - return nil, err - } - - if fi.Size() < pos { - positions.Remove(path, labels) - } - - // If no cached position is found and the tailFromEnd option is enabled. - if pos == 0 && tailFromEnd { - pos, err = getLastLinePosition(path) - if err != nil { - level.Error(logger).Log("msg", "failed to get a position from the end of the file, default to start of file", err) - } else { - positions.Put(path, labels, pos) - level.Info(logger).Log("msg", "retrieved and stored the position of the last line") - } - } - - tail, err := tail.TailFile(path, tail.Config{ - Follow: true, - Poll: true, - ReOpen: true, - MustExist: true, - Location: &tail.SeekInfo{ - Offset: pos, - Whence: 0, - }, - Logger: util.NewLogAdapter(logger), - PollOptions: pollOptions, - }) - if err != nil { - return nil, err - } +func newTailer(metrics *metrics, logger log.Logger, receiver loki.LogsReceiver, positions positions.Positions, path string, + labels model.LabelSet, encoding string, pollOptions watch.PollingFileWatcherOptions, tailFromEnd bool) (*tailer, error) { - logger = log.With(logger, "component", "tailer") tailer := &tailer{ - metrics: metrics, - logger: logger, - handler: loki.AddLabelsMiddleware(model.LabelSet{filenameLabel: model.LabelValue(path)}).Wrap(handler), - positions: positions, - path: path, - labels: labels, - tail: tail, - running: atomic.NewBool(false), - posquit: make(chan struct{}), - posdone: make(chan struct{}), - done: make(chan struct{}), + metrics: metrics, + logger: log.With(logger, "component", "tailer"), + receiver: receiver, + positions: positions, + path: path, + labels: labels, + labelsStr: labels.String(), + running: atomic.NewBool(false), + tailFromEnd: tailFromEnd, + pollOptions: pollOptions, + posquit: make(chan struct{}), + posdone: make(chan struct{}), + done: make(chan struct{}), } if encoding != "" { @@ -118,9 +80,6 @@ func newTailer(metrics *metrics, logger log.Logger, handler loki.EntryHandler, p tailer.decoder = decoder } - go tailer.readLines() - go tailer.updatePosition() - metrics.filesActive.Add(1.) return tailer, nil } @@ -180,10 +139,67 @@ func getLastLinePosition(path string) (int64, error) { } } +func (t *tailer) Run() { + fi, err := os.Stat(t.path) + if err != nil { + level.Error(t.logger).Log("msg", "failed to tail file", "path", t.path, "err", err) + return + } + pos, err := t.positions.Get(t.path, t.labelsStr) + if err != nil { + level.Error(t.logger).Log("msg", "failed to get file position", "err", err) + return + } + + if fi.Size() < pos { + t.positions.Remove(t.path, t.labelsStr) + } + + // If no cached position is found and the tailFromEnd option is enabled. + if pos == 0 && t.tailFromEnd { + pos, err = getLastLinePosition(t.path) + if err != nil { + level.Error(t.logger).Log("msg", "failed to get a position from the end of the file, default to start of file", err) + } else { + t.positions.Put(t.path, t.labelsStr, pos) + level.Info(t.logger).Log("msg", "retrieved and stored the position of the last line") + } + } + labelsMiddleware := t.labels.Merge(model.LabelSet{filenameLabel: model.LabelValue(t.path)}) + handler := loki.AddLabelsMiddleware(labelsMiddleware).Wrap(loki.NewEntryHandler(t.receiver.Chan(), func() {})) + defer handler.Stop() + + tail, err := tail.TailFile(t.path, tail.Config{ + Follow: true, + Poll: true, + ReOpen: true, + MustExist: true, + Location: &tail.SeekInfo{ + Offset: pos, + Whence: 0, + }, + Logger: util.NewLogAdapter(t.logger), + PollOptions: t.pollOptions, + }) + if err != nil { + level.Error(t.logger).Log("msg", "failed to tail the file", "err", err) + return + } + t.tail = tail + + t.posquit = make(chan struct{}) + t.posdone = make(chan struct{}) + t.done = make(chan struct{}) + + go t.updatePosition() + t.metrics.filesActive.Add(1.) + t.readLines(handler) +} + // updatePosition is run in a goroutine and checks the current size of the file // and saves it to the positions file at a regular interval. If there is ever // an error it stops the tailer and exits, the tailer will be re-opened by the -// filetarget sync method if it still exists and will start reading from the +// backoff retry method if it still exists and will start reading from the // last successful entry in the positions file. func (t *tailer) updatePosition() { positionSyncPeriod := t.positions.SyncPeriod() @@ -191,7 +207,7 @@ func (t *tailer) updatePosition() { defer func() { positionWait.Stop() level.Info(t.logger).Log("msg", "position timer: exited", "path", t.path) - // NOTE: metrics must be cleaned up after the position timer exits, as MarkPositionAndSize() updates metrics. + // NOTE: metrics must be cleaned up after the position timer exits, as markPositionAndSize() updates metrics. t.cleanupMetrics() close(t.posdone) }() @@ -199,7 +215,7 @@ func (t *tailer) updatePosition() { for { select { case <-positionWait.C: - err := t.MarkPositionAndSize() + err := t.markPositionAndSize() if err != nil { level.Error(t.logger).Log("msg", "position timer: error getting tail position and/or size, stopping tailer", "path", t.path, "error", err) err := t.tail.Stop() @@ -214,19 +230,17 @@ func (t *tailer) updatePosition() { } } -// readLines runs in a goroutine and consumes the t.tail.Lines channel from the -// underlying tailer. Et will only exit when that channel is closed. This is +// readLines consumes the t.tail.Lines channel from the +// underlying tailer. It will only exit when that channel is closed. This is // important to avoid a deadlock in the underlying tailer which can happen if // there are unread lines in this channel and the Stop method on the tailer is // called, the underlying tailer will never exit if there are unread lines in // the t.tail.Lines channel -func (t *tailer) readLines() { +func (t *tailer) readLines(handler loki.EntryHandler) { level.Info(t.logger).Log("msg", "tail routine: started", "path", t.path) t.running.Store(true) - // This function runs in a goroutine, if it exits this tailer will never do any more tailing. - // Clean everything up. defer func() { t.running.Store(false) level.Info(t.logger).Log("msg", "tail routine: exited", "path", t.path) @@ -234,7 +248,7 @@ func (t *tailer) readLines() { // Shut down the position marker thread close(t.posquit) }() - entries := t.handler.Chan() + entries := handler.Chan() for { line, ok := <-t.tail.Lines if !ok { @@ -272,8 +286,8 @@ func (t *tailer) readLines() { } } -func (t *tailer) MarkPositionAndSize() error { - // Lock this update as there are 2 timers calling this routine, the sync in filetarget and the positions sync in this file. +func (t *tailer) markPositionAndSize() error { + // Lock this update because it can be called in two different threads t.posAndSizeMtx.Lock() defer t.posAndSizeMtx.Unlock() @@ -295,40 +309,35 @@ func (t *tailer) MarkPositionAndSize() error { // Update metrics and positions file all together to avoid race conditions when `t.tail` is stopped. t.metrics.totalBytes.WithLabelValues(t.path).Set(float64(size)) t.metrics.readBytes.WithLabelValues(t.path).Set(float64(pos)) - t.positions.Put(t.path, t.labels, pos) + t.positions.Put(t.path, t.labelsStr, pos) return nil } func (t *tailer) Stop() { - // stop can be called by two separate threads in filetarget, to avoid a panic closing channels more than once - // we wrap the stop in a sync.Once. - t.stopOnce.Do(func() { - // Save the current position before shutting down tailer - err := t.MarkPositionAndSize() - if err != nil { - level.Error(t.logger).Log("msg", "error marking file position when stopping tailer", "path", t.path, "error", err) - } + // Save the current position before shutting down tailer + err := t.markPositionAndSize() + if err != nil { + level.Error(t.logger).Log("msg", "error marking file position when stopping tailer", "path", t.path, "error", err) + } - // Stop the underlying tailer to prevent resource leak. - err = t.tail.Stop() - if err != nil { - if utils.IsEphemeralOrFileClosed(err) { - // Don't log as error if the file is already closed, or we got an ephemeral error - it's a common case - // when files are rotating while being read and the tailer would have stopped correctly anyway. - level.Debug(t.logger).Log("msg", "tailer stopped with file I/O error", "path", t.path, "error", err) - } else { - // Log as error for other reasons, as a resource leak may have happened. - level.Error(t.logger).Log("msg", "error stopping tailer", "path", t.path, "error", err) - } + // Stop the underlying tailer to prevent resource leak. + err = t.tail.Stop() + if err != nil { + if utils.IsEphemeralOrFileClosed(err) { + // Don't log as error if the file is already closed, or we got an ephemeral error - it's a common case + // when files are rotating while being read and the tailer would have stopped correctly anyway. + level.Debug(t.logger).Log("msg", "tailer stopped with file I/O error", "path", t.path, "error", err) + } else { + // Log as error for other reasons, as a resource leak may have happened. + level.Error(t.logger).Log("msg", "error stopping tailer", "path", t.path, "error", err) } - // Wait for readLines() to consume all the remaining messages and exit when the channel is closed - <-t.done - // Wait for the position marker thread to exit - <-t.posdone - level.Info(t.logger).Log("msg", "stopped tailing file", "path", t.path) - t.handler.Stop() - }) + } + // Wait for readLines() to consume all the remaining messages and exit when the channel is closed + <-t.done + // Wait for the position marker thread to exit + <-t.posdone + level.Info(t.logger).Log("msg", "stopped tailing file", "path", t.path) } func (t *tailer) IsRunning() bool { diff --git a/internal/component/loki/source/file/tailer_test.go b/internal/component/loki/source/file/tailer_test.go index b023ea631f..dcc842241a 100644 --- a/internal/component/loki/source/file/tailer_test.go +++ b/internal/component/loki/source/file/tailer_test.go @@ -3,7 +3,18 @@ package file import ( "bytes" "os" + "path/filepath" "testing" + "time" + + "github.com/grafana/alloy/internal/component/common/loki" + "github.com/grafana/alloy/internal/component/common/loki/positions" + "github.com/grafana/alloy/internal/util" + "github.com/grafana/tail/watch" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/goleak" ) func createTempFileWithContent(t *testing.T, content []byte) string { @@ -78,3 +89,83 @@ func TestGetLastLinePosition(t *testing.T) { }) } } + +func TestTailer(t *testing.T) { + defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start")) + l := util.TestLogger(t) + ch1 := loki.NewLogsReceiver() + tempDir := t.TempDir() + logFile, err := os.CreateTemp(tempDir, "example") + require.NoError(t, err) + positionsFile, err := positions.New(l, positions.Config{ + SyncPeriod: 50 * time.Millisecond, + PositionsFile: filepath.Join(tempDir, "positions.yaml"), + IgnoreInvalidYaml: false, + ReadOnly: false, + }) + require.NoError(t, err) + labels := model.LabelSet{ + "filename": model.LabelValue(logFile.Name()), + "foo": "bar", + } + tailer, err := newTailer( + newMetrics(nil), + l, + ch1, + positionsFile, + logFile.Name(), + labels, + "", + watch.PollingFileWatcherOptions{ + MinPollFrequency: 25 * time.Millisecond, + MaxPollFrequency: 25 * time.Millisecond, + }, + false, + ) + go tailer.Run() + + _, err = logFile.Write([]byte("writing some text\n")) + require.NoError(t, err) + select { + case logEntry := <-ch1.Chan(): + require.Equal(t, "writing some text", logEntry.Line) + case <-time.After(1 * time.Second): + require.FailNow(t, "failed waiting for log line") + } + + require.EventuallyWithT(t, func(c *assert.CollectT) { + pos, err := positionsFile.Get(logFile.Name(), labels.String()) + assert.NoError(c, err) + assert.Equal(c, int64(18), pos) + }, time.Second, 50*time.Millisecond) + + tailer.Stop() + + // Run the tailer again + go tailer.Run() + select { + case <-ch1.Chan(): + t.Fatal("no message should be sent because of the position file") + case <-time.After(1 * time.Second): + } + + // Write logs again + _, err = logFile.Write([]byte("writing some new text\n")) + require.NoError(t, err) + select { + case logEntry := <-ch1.Chan(): + require.Equal(t, "writing some new text", logEntry.Line) + case <-time.After(1 * time.Second): + require.FailNow(t, "failed waiting for log line") + } + + tailer.Stop() + + require.EventuallyWithT(t, func(c *assert.CollectT) { + pos, err := positionsFile.Get(logFile.Name(), labels.String()) + assert.NoError(c, err) + assert.Equal(c, int64(40), pos) + }, time.Second, 50*time.Millisecond) + + positionsFile.Stop() +} From 20fa27e4e846b07c2e61eae6ec9743347e54d690 Mon Sep 17 00:00:00 2001 From: William Dumont Date: Thu, 16 Jan 2025 15:51:49 +0100 Subject: [PATCH 2/6] Update internal/component/loki/source/file/runner.go Co-authored-by: Piotr <17101802+thampiotr@users.noreply.github.com> --- internal/component/loki/source/file/runner.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/component/loki/source/file/runner.go b/internal/component/loki/source/file/runner.go index f0e81859bf..fd65a55c47 100644 --- a/internal/component/loki/source/file/runner.go +++ b/internal/component/loki/source/file/runner.go @@ -32,7 +32,7 @@ func (r *runnerTask) Equals(other runner.Task) bool { } // runnerReader is a wrapper around a reader (tailer or decompressor) -// It is responsible for running the runner. If the reader stops running, +// It is responsible for running the reader. If the reader stops running, // it will retry it after a few seconds. This is useful to handle log file rotation // when a file might be gone for a very short amount of time. // The runner is only stopped when the corresponding target is gone or when the component is stopped. From ab594113cf2c196ae8e72c4dbc5d2c772895b89a Mon Sep 17 00:00:00 2001 From: William Dumont Date: Fri, 17 Jan 2025 09:53:35 +0100 Subject: [PATCH 3/6] Update internal/component/loki/source/file/tailer.go Co-authored-by: Sam DeHaan --- internal/component/loki/source/file/tailer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/component/loki/source/file/tailer.go b/internal/component/loki/source/file/tailer.go index 8f1cde1aa0..2edf6f62b7 100644 --- a/internal/component/loki/source/file/tailer.go +++ b/internal/component/loki/source/file/tailer.go @@ -287,7 +287,7 @@ func (t *tailer) readLines(handler loki.EntryHandler) { } func (t *tailer) markPositionAndSize() error { - // Lock this update because it can be called in two different threads + // Lock this update because it can be called in two different goroutines t.posAndSizeMtx.Lock() defer t.posAndSizeMtx.Unlock() From 6adc8e0866eb3b1cb6721b266691efba9b8d04b6 Mon Sep 17 00:00:00 2001 From: William Dumont Date: Fri, 17 Jan 2025 10:01:06 +0100 Subject: [PATCH 4/6] Update internal/component/loki/source/file/decompresser.go Co-authored-by: Sam DeHaan --- internal/component/loki/source/file/decompresser.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/component/loki/source/file/decompresser.go b/internal/component/loki/source/file/decompresser.go index 7b000dfd72..2292f08da4 100644 --- a/internal/component/loki/source/file/decompresser.go +++ b/internal/component/loki/source/file/decompresser.go @@ -275,7 +275,7 @@ func (d *decompressor) readLines(handler loki.EntryHandler) { } func (d *decompressor) markPositionAndSize() error { - // Lock this update because it can be called in two different threads + // Lock this update because it can be called in two different goroutines d.posAndSizeMtx.Lock() defer d.posAndSizeMtx.Unlock() From 035b4631bc52737b005487c7d9c28d72c94d4f76 Mon Sep 17 00:00:00 2001 From: William Dumont Date: Fri, 17 Jan 2025 12:35:26 +0100 Subject: [PATCH 5/6] review feedback --- .../loki/source/file/decompresser.go | 48 ++++++++++++++----- internal/component/loki/source/file/file.go | 2 +- internal/component/loki/source/file/runner.go | 7 +++ internal/component/loki/source/file/tailer.go | 48 +++++++++++++++---- 4 files changed, 84 insertions(+), 21 deletions(-) diff --git a/internal/component/loki/source/file/decompresser.go b/internal/component/loki/source/file/decompresser.go index 7b000dfd72..1dc6e60608 100644 --- a/internal/component/loki/source/file/decompresser.go +++ b/internal/component/loki/source/file/decompresser.go @@ -1,6 +1,6 @@ package file -// This code is adapted from loki/promtail@a8d5815510bd959a6dd8c176a5d9fd9bbfc8f8b5. +// This code is adapted from loki/promtail. Last revision used to port changes to Alloy was a8d5815510bd959a6dd8c176a5d9fd9bbfc8f8b5. // Decompressor implements the reader interface and is used to read compressed log files. // It uses the Go stdlib's compress/* packages for decoding. @@ -50,18 +50,21 @@ type decompressor struct { labels model.LabelSet labelsStr string - posAndSizeMtx sync.Mutex + posAndSizeMtx sync.RWMutex running *atomic.Bool - posquit chan struct{} // used by the readLine method to tell the updatePosition method to stop - posdone chan struct{} // used by the updatePosition method to notify when it stopped - done chan struct{} // used by the readLine method to notify when it stopped decoder *encoding.Decoder position int64 size int64 cfg DecompressionConfig + + mut sync.RWMutex + stopping bool + posquit chan struct{} // used by the readLine method to tell the updatePosition method to stop + posdone chan struct{} // used by the updatePosition method to notify when it stopped + done chan struct{} // used by the readLine method to notify when it stopped } func newDecompressor( @@ -152,12 +155,23 @@ func mountReader(f *os.File, logger log.Logger, format CompressionFormat) (reade } func (d *decompressor) Run() { + d.mut.Lock() + + // Check if the stop function was called between two Run. + if d.stopping { + close(d.done) + close(d.posdone) + return + } + labelsMiddleware := d.labels.Merge(model.LabelSet{filenameLabel: model.LabelValue(d.path)}) handler := loki.AddLabelsMiddleware(labelsMiddleware).Wrap(loki.NewEntryHandler(d.receiver.Chan(), func() {})) defer handler.Stop() d.posquit = make(chan struct{}) d.posdone = make(chan struct{}) d.done = make(chan struct{}) + d.mut.Unlock() + go d.updatePosition() d.metrics.filesActive.Add(1.) d.readLines(handler) @@ -240,10 +254,13 @@ func (d *decompressor) readLines(handler loki.EntryHandler) { break } + d.posAndSizeMtx.RLock() if line <= int(d.position) { // skip already seen lines. + d.posAndSizeMtx.RUnlock() continue } + d.posAndSizeMtx.RUnlock() text := scanner.Text() var finalText string @@ -269,15 +286,17 @@ func (d *decompressor) readLines(handler loki.EntryHandler) { }, } + d.posAndSizeMtx.Lock() d.size = int64(unsafe.Sizeof(finalText)) d.position++ + d.posAndSizeMtx.Unlock() } } func (d *decompressor) markPositionAndSize() error { // Lock this update because it can be called in two different threads - d.posAndSizeMtx.Lock() - defer d.posAndSizeMtx.Unlock() + d.posAndSizeMtx.RLock() + defer d.posAndSizeMtx.RUnlock() d.metrics.totalBytes.WithLabelValues(d.path).Set(float64(d.size)) d.metrics.readBytes.WithLabelValues(d.path).Set(float64(d.position)) @@ -287,18 +306,25 @@ func (d *decompressor) markPositionAndSize() error { } func (d *decompressor) Stop() { + d.mut.RLock() + d.stopping = true + defer func() { + d.stopping = false + }() + d.mut.RUnlock() + // Shut down the position marker thread close(d.posquit) <-d.posdone + // Wait for readLines() to consume all the remaining messages and exit when the channel is closed + <-d.done + level.Info(d.logger).Log("msg", "stopped decompressor", "path", d.path) + // Save the current position before shutting down reader if err := d.markPositionAndSize(); err != nil { level.Error(d.logger).Log("msg", "error marking file position when stopping decompressor", "path", d.path, "error", err) } - - // Wait for readLines() to consume all the remaining messages and exit when the channel is closed - <-d.done - level.Info(d.logger).Log("msg", "stopped decompressor", "path", d.path) } func (d *decompressor) IsRunning() bool { diff --git a/internal/component/loki/source/file/file.go b/internal/component/loki/source/file/file.go index e1ca7a3063..5b11050932 100644 --- a/internal/component/loki/source/file/file.go +++ b/internal/component/loki/source/file/file.go @@ -215,7 +215,7 @@ func (c *Component) Update(args component.Arguments) error { reader: reader, path: path, labels: labels.String(), - // Could fastFingerPrint work? + // TODO: Could fastFingerPrint work? readerHash: uint64(labels.Merge(model.LabelSet{filenameLabel: model.LabelValue(path)}).Fingerprint()), } } diff --git a/internal/component/loki/source/file/runner.go b/internal/component/loki/source/file/runner.go index f0e81859bf..02ccea0147 100644 --- a/internal/component/loki/source/file/runner.go +++ b/internal/component/loki/source/file/runner.go @@ -2,6 +2,7 @@ package file import ( "context" + "sync" "time" "github.com/grafana/alloy/internal/runner" @@ -50,7 +51,11 @@ func (r *runnerReader) Run(ctx context.Context) { }, ) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() <-ctx.Done() r.reader.Stop() }() @@ -63,4 +68,6 @@ func (r *runnerReader) Run(ctx context.Context) { } } + // Wait for the stop function to exit to ensure that the reader was properly stopped. + wg.Wait() } diff --git a/internal/component/loki/source/file/tailer.go b/internal/component/loki/source/file/tailer.go index 8f1cde1aa0..3bbdfa084f 100644 --- a/internal/component/loki/source/file/tailer.go +++ b/internal/component/loki/source/file/tailer.go @@ -1,6 +1,6 @@ package file -// This code is adapted from loki/promtail@a8d5815510bd959a6dd8c176a5d9fd9bbfc8f8b5. +// This code is adapted from loki/promtail. Last revision used to port changes to Alloy was a8d5815510bd959a6dd8c176a5d9fd9bbfc8f8b5. // tailer implements the reader interface by using the github.com/grafana/tail package to tail files. import ( @@ -34,19 +34,23 @@ type tailer struct { receiver loki.LogsReceiver positions positions.Positions - path string - labelsStr string - labels model.LabelSet - tail *tail.Tail + path string + labelsStr string + labels model.LabelSet + tailFromEnd bool pollOptions watch.PollingFileWatcherOptions posAndSizeMtx sync.Mutex running *atomic.Bool - posquit chan struct{} // used by the readLine method to tell the updatePosition method to stop - posdone chan struct{} // used by the updatePosition method to notify when it stopped - done chan struct{} // used by the readLine method to notify when it stopped + + mut sync.RWMutex + stopping bool + tail *tail.Tail + posquit chan struct{} // used by the readLine method to tell the updatePosition method to stop + posdone chan struct{} // used by the updatePosition method to notify when it stopped + done chan struct{} // used by the readLine method to notify when it stopped decoder *encoding.Decoder } @@ -140,6 +144,16 @@ func getLastLinePosition(path string) (int64, error) { } func (t *tailer) Run() { + t.mut.Lock() + + // Check if the stop function was called between two Run. + if t.stopping { + close(t.done) + close(t.posdone) + close(t.posquit) + return + } + fi, err := os.Stat(t.path) if err != nil { level.Error(t.logger).Log("msg", "failed to tail file", "path", t.path, "err", err) @@ -151,6 +165,11 @@ func (t *tailer) Run() { return } + // NOTE: The code assumes that if a position is available and that the file is bigger than the position, then + // the tail should start from the position. This may not be always desired in situation where the file was rotated + // with a file that has the same name but different content and a bigger size that the previous one. This problem would + // mostly show up on Windows because on Unix systems, the readlines function is not exited on file rotation. + // If this ever becomes a problem, we may want to consider saving and comparing file creation timestamps. if fi.Size() < pos { t.positions.Remove(t.path, t.labelsStr) } @@ -190,6 +209,7 @@ func (t *tailer) Run() { t.posquit = make(chan struct{}) t.posdone = make(chan struct{}) t.done = make(chan struct{}) + t.mut.Unlock() go t.updatePosition() t.metrics.filesActive.Add(1.) @@ -315,6 +335,12 @@ func (t *tailer) markPositionAndSize() error { } func (t *tailer) Stop() { + t.mut.RLock() + t.stopping = true + defer func() { + t.stopping = false + }() + // Save the current position before shutting down tailer err := t.markPositionAndSize() if err != nil { @@ -322,7 +348,11 @@ func (t *tailer) Stop() { } // Stop the underlying tailer to prevent resource leak. - err = t.tail.Stop() + if t.tail != nil { + err = t.tail.Stop() + } + t.mut.RUnlock() + if err != nil { if utils.IsEphemeralOrFileClosed(err) { // Don't log as error if the file is already closed, or we got an ephemeral error - it's a common case From 52b287e05bc963148b302f09f6ab02b0c52eeb69 Mon Sep 17 00:00:00 2001 From: William Dumont Date: Mon, 20 Jan 2025 09:53:34 +0100 Subject: [PATCH 6/6] add missing mut unlock --- internal/component/loki/source/file/decompresser.go | 1 + internal/component/loki/source/file/tailer.go | 1 + 2 files changed, 2 insertions(+) diff --git a/internal/component/loki/source/file/decompresser.go b/internal/component/loki/source/file/decompresser.go index db78da36bb..de22a31791 100644 --- a/internal/component/loki/source/file/decompresser.go +++ b/internal/component/loki/source/file/decompresser.go @@ -161,6 +161,7 @@ func (d *decompressor) Run() { if d.stopping { close(d.done) close(d.posdone) + d.mut.Unlock() return } diff --git a/internal/component/loki/source/file/tailer.go b/internal/component/loki/source/file/tailer.go index c477eb306f..7855003cfa 100644 --- a/internal/component/loki/source/file/tailer.go +++ b/internal/component/loki/source/file/tailer.go @@ -151,6 +151,7 @@ func (t *tailer) Run() { close(t.done) close(t.posdone) close(t.posquit) + t.mut.Unlock() return }