diff --git a/CHANGELOG.md b/CHANGELOG.md index f238aadb97..0d4b8be07c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,10 @@ Main (unreleased) - Add json format support for log export via faro receiver (@ravishankar15) +### Bugfixes + +- Fix log rotation for Windows in `loki.source.file` by refactoring the component to use the runner pkg. This should also reduce CPU consumption when tailing a lot of files in a dynamic environment. (@wildum) + - Add livedebugging support for `prometheus.remote_write` (@ravishankar15) v1.6.0 diff --git a/internal/component/loki/source/file/decompresser.go b/internal/component/loki/source/file/decompresser.go index fd565d19dd..aa46f79d89 100644 --- a/internal/component/loki/source/file/decompresser.go +++ b/internal/component/loki/source/file/decompresser.go @@ -1,6 +1,6 @@ package file -// This code is copied from loki/promtail@a8d5815510bd959a6dd8c176a5d9fd9bbfc8f8b5. +// This code is adapted from loki/promtail. Last revision used to port changes to Alloy was a8d5815510bd959a6dd8c176a5d9fd9bbfc8f8b5. // Decompressor implements the reader interface and is used to read compressed log files. // It uses the Go stdlib's compress/* packages for decoding. @@ -43,41 +43,49 @@ func supportedCompressedFormats() map[string]struct{} { type decompressor struct { metrics *metrics logger log.Logger - handler loki.EntryHandler + receiver loki.LogsReceiver positions positions.Positions - path string - labels string + path string + labels model.LabelSet + labelsStr string - posAndSizeMtx sync.Mutex - stopOnce sync.Once + posAndSizeMtx sync.RWMutex running *atomic.Bool - posquit chan struct{} - posdone chan struct{} - done chan struct{} decoder *encoding.Decoder position int64 size int64 cfg DecompressionConfig + + componentStopping func() bool + + mut sync.RWMutex + stopping bool + posquit chan struct{} // used by the readLine method to tell the updatePosition method to stop + posdone chan struct{} // used by the updatePosition method to notify when it stopped + done chan struct{} // used by the readLine method to notify when it stopped } func newDecompressor( metrics *metrics, logger log.Logger, - handler loki.EntryHandler, + receiver loki.LogsReceiver, positions positions.Positions, path string, - labels string, + labels model.LabelSet, encodingFormat string, cfg DecompressionConfig, + componentStopping func() bool, ) (*decompressor, error) { + labelsStr := labels.String() + logger = log.With(logger, "component", "decompressor") - pos, err := positions.Get(path, labels) + pos, err := positions.Get(path, labelsStr) if err != nil { return nil, fmt.Errorf("failed to get positions: %w", err) } @@ -93,24 +101,23 @@ func newDecompressor( } decompressor := &decompressor{ - metrics: metrics, - logger: logger, - handler: loki.AddLabelsMiddleware(model.LabelSet{filenameLabel: model.LabelValue(path)}).Wrap(handler), - positions: positions, - path: path, - labels: labels, - running: atomic.NewBool(false), - posquit: make(chan struct{}), - posdone: make(chan struct{}), - done: make(chan struct{}), - position: pos, - decoder: decoder, - cfg: cfg, + metrics: metrics, + logger: logger, + receiver: receiver, + positions: positions, + path: path, + labels: labels, + labelsStr: labelsStr, + running: atomic.NewBool(false), + posquit: make(chan struct{}), + posdone: make(chan struct{}), + done: make(chan struct{}), + position: pos, + decoder: decoder, + cfg: cfg, + componentStopping: componentStopping, } - go decompressor.readLines() - go decompressor.updatePosition() - metrics.filesActive.Add(1.) return decompressor, nil } @@ -151,6 +158,30 @@ func mountReader(f *os.File, logger log.Logger, format CompressionFormat) (reade return reader, nil } +func (d *decompressor) Run() { + d.mut.Lock() + + // Check if the stop function was called between two Run. + if d.stopping { + close(d.done) + close(d.posdone) + d.mut.Unlock() + return + } + + labelsMiddleware := d.labels.Merge(model.LabelSet{filenameLabel: model.LabelValue(d.path)}) + handler := loki.AddLabelsMiddleware(labelsMiddleware).Wrap(loki.NewEntryHandler(d.receiver.Chan(), func() {})) + defer handler.Stop() + d.posquit = make(chan struct{}) + d.posdone = make(chan struct{}) + d.done = make(chan struct{}) + d.mut.Unlock() + + go d.updatePosition() + d.metrics.filesActive.Add(1.) + d.readLines(handler) +} + func (d *decompressor) updatePosition() { positionSyncPeriod := d.positions.SyncPeriod() positionWait := time.NewTicker(positionSyncPeriod) @@ -163,7 +194,7 @@ func (d *decompressor) updatePosition() { for { select { case <-positionWait.C: - if err := d.MarkPositionAndSize(); err != nil { + if err := d.markPositionAndSize(); err != nil { level.Error(d.logger).Log("msg", "position timer: error getting position and/or size, stopping decompressor", "path", d.path, "error", err) return } @@ -178,7 +209,7 @@ func (d *decompressor) updatePosition() { // It first decompresses the file as a whole using a reader and then it will iterate // over its chunks, separated by '\n'. // During each iteration, the parsed and decoded log line is then sent to the API with the current timestamp. -func (d *decompressor) readLines() { +func (d *decompressor) readLines(handler loki.EntryHandler) { level.Info(d.logger).Log("msg", "read lines routine: started", "path", d.path) d.running.Store(true) @@ -188,11 +219,12 @@ func (d *decompressor) readLines() { } defer func() { + d.running.Store(false) d.cleanupMetrics() level.Info(d.logger).Log("msg", "read lines routine finished", "path", d.path) close(d.done) }() - entries := d.handler.Chan() + entries := handler.Chan() f, err := os.Open(d.path) if err != nil { @@ -227,10 +259,13 @@ func (d *decompressor) readLines() { break } + d.posAndSizeMtx.RLock() if line <= d.position { // skip already seen lines. + d.posAndSizeMtx.RUnlock() continue } + d.posAndSizeMtx.RUnlock() text := scanner.Text() var finalText string @@ -256,41 +291,51 @@ func (d *decompressor) readLines() { }, } + d.posAndSizeMtx.Lock() d.size = int64(unsafe.Sizeof(finalText)) d.position++ + d.posAndSizeMtx.Unlock() } } -func (d *decompressor) MarkPositionAndSize() error { - // Lock this update as there are 2 timers calling this routine, the sync in filetarget and the positions sync in this file. - d.posAndSizeMtx.Lock() - defer d.posAndSizeMtx.Unlock() +func (d *decompressor) markPositionAndSize() error { + // Lock this update because it can be called in two different goroutines + d.posAndSizeMtx.RLock() + defer d.posAndSizeMtx.RUnlock() d.metrics.totalBytes.WithLabelValues(d.path).Set(float64(d.size)) d.metrics.readBytes.WithLabelValues(d.path).Set(float64(d.position)) - d.positions.Put(d.path, d.labels, d.position) + d.positions.Put(d.path, d.labelsStr, d.position) return nil } func (d *decompressor) Stop() { - // stop can be called by two separate threads in filetarget, to avoid a panic closing channels more than once - // we wrap the stop in a sync.Once. - d.stopOnce.Do(func() { - // Shut down the position marker thread - close(d.posquit) - <-d.posdone + d.mut.RLock() + d.stopping = true + defer func() { + d.stopping = false + }() + d.mut.RUnlock() + // Shut down the position marker thread + close(d.posquit) + <-d.posdone + + // Wait for readLines() to consume all the remaining messages and exit when the channel is closed + <-d.done + level.Info(d.logger).Log("msg", "stopped decompressor", "path", d.path) + + // If the component is not stopping, then it means that the target for this component is gone and that + // we should clear the entry from the positions file. + if !d.componentStopping() { + d.positions.Remove(d.path, d.labelsStr) + } else { // Save the current position before shutting down reader - if err := d.MarkPositionAndSize(); err != nil { + if err := d.markPositionAndSize(); err != nil { level.Error(d.logger).Log("msg", "error marking file position when stopping decompressor", "path", d.path, "error", err) } - - // Wait for readLines() to consume all the remaining messages and exit when the channel is closed - <-d.done - level.Info(d.logger).Log("msg", "stopped decompressor", "path", d.path) - d.handler.Stop() - }) + } } func (d *decompressor) IsRunning() bool { diff --git a/internal/component/loki/source/file/decompresser_test.go b/internal/component/loki/source/file/decompresser_test.go index 204fa55a0e..2eed358599 100644 --- a/internal/component/loki/source/file/decompresser_test.go +++ b/internal/component/loki/source/file/decompresser_test.go @@ -5,17 +5,23 @@ package file import ( "os" + "path/filepath" "sync" "testing" "time" "github.com/grafana/alloy/internal/component/common/loki/client/fake" + "github.com/grafana/alloy/internal/component/common/loki/positions" + "github.com/grafana/alloy/internal/util" "github.com/go-kit/log" "github.com/grafana/alloy/internal/component/common/loki" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/atomic" + "go.uber.org/goleak" ) type noopClient struct { @@ -64,17 +70,17 @@ func BenchmarkReadlines(b *testing.B) { for _, tc := range scenarios { b.Run(tc.name, func(b *testing.B) { decBase := &decompressor{ - logger: log.NewNopLogger(), - running: atomic.NewBool(false), - handler: entryHandler, - path: tc.file, + logger: log.NewNopLogger(), + running: atomic.NewBool(false), + receiver: loki.NewLogsReceiver(), + path: tc.file, } for i := 0; i < b.N; i++ { newDec := decBase newDec.metrics = newMetrics(prometheus.NewRegistry()) newDec.done = make(chan struct{}) - newDec.readLines() + newDec.readLines(entryHandler) <-newDec.done } }) @@ -87,16 +93,16 @@ func TestGigantiqueGunzipFile(t *testing.T) { defer handler.Stop() d := &decompressor{ - logger: log.NewNopLogger(), - running: atomic.NewBool(false), - handler: handler, - path: file, - done: make(chan struct{}), - metrics: newMetrics(prometheus.NewRegistry()), - cfg: DecompressionConfig{Format: "gz"}, + logger: log.NewNopLogger(), + running: atomic.NewBool(false), + receiver: loki.NewLogsReceiver(), + path: file, + done: make(chan struct{}), + metrics: newMetrics(prometheus.NewRegistry()), + cfg: DecompressionConfig{Format: "gz"}, } - d.readLines() + d.readLines(handler) <-d.done time.Sleep(time.Millisecond * 200) @@ -117,16 +123,16 @@ func TestOnelineFiles(t *testing.T) { defer handler.Stop() d := &decompressor{ - logger: log.NewNopLogger(), - running: atomic.NewBool(false), - handler: handler, - path: file, - done: make(chan struct{}), - metrics: newMetrics(prometheus.NewRegistry()), - cfg: DecompressionConfig{Format: "gz"}, + logger: log.NewNopLogger(), + running: atomic.NewBool(false), + receiver: loki.NewLogsReceiver(), + path: file, + done: make(chan struct{}), + metrics: newMetrics(prometheus.NewRegistry()), + cfg: DecompressionConfig{Format: "gz"}, } - d.readLines() + d.readLines(handler) <-d.done time.Sleep(time.Millisecond * 200) @@ -142,16 +148,16 @@ func TestOnelineFiles(t *testing.T) { defer handler.Stop() d := &decompressor{ - logger: log.NewNopLogger(), - running: atomic.NewBool(false), - handler: handler, - path: file, - done: make(chan struct{}), - metrics: newMetrics(prometheus.NewRegistry()), - cfg: DecompressionConfig{Format: "bz2"}, + logger: log.NewNopLogger(), + running: atomic.NewBool(false), + receiver: loki.NewLogsReceiver(), + path: file, + done: make(chan struct{}), + metrics: newMetrics(prometheus.NewRegistry()), + cfg: DecompressionConfig{Format: "bz2"}, } - d.readLines() + d.readLines(handler) <-d.done time.Sleep(time.Millisecond * 200) @@ -167,16 +173,16 @@ func TestOnelineFiles(t *testing.T) { defer handler.Stop() d := &decompressor{ - logger: log.NewNopLogger(), - running: atomic.NewBool(false), - handler: handler, - path: file, - done: make(chan struct{}), - metrics: newMetrics(prometheus.NewRegistry()), - cfg: DecompressionConfig{Format: "gz"}, + logger: log.NewNopLogger(), + running: atomic.NewBool(false), + receiver: loki.NewLogsReceiver(), + path: file, + done: make(chan struct{}), + metrics: newMetrics(prometheus.NewRegistry()), + cfg: DecompressionConfig{Format: "gz"}, } - d.readLines() + d.readLines(handler) <-d.done time.Sleep(time.Millisecond * 200) @@ -188,3 +194,115 @@ func TestOnelineFiles(t *testing.T) { require.Contains(t, firstEntry.Line, `5.202.214.160 - - [26/Jan/2019:19:45:25 +0330] "GET / HTTP/1.1" 200 30975 "https://www.zanbil.ir/" "Mozilla/5.0 (Windows NT 6.2; WOW64; rv:21.0) Gecko/20100101 Firefox/21.0" "-"`) }) } + +func TestDecompressor(t *testing.T) { + defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start")) + l := util.TestLogger(t) + ch1 := loki.NewLogsReceiver() + tempDir := t.TempDir() + positionsFile, err := positions.New(l, positions.Config{ + SyncPeriod: 50 * time.Millisecond, + PositionsFile: filepath.Join(tempDir, "positions.yaml"), + IgnoreInvalidYaml: false, + ReadOnly: false, + }) + require.NoError(t, err) + filename := "testdata/onelinelog.tar.gz" + labels := model.LabelSet{ + "filename": model.LabelValue(filename), + "foo": "bar", + } + decompressor, err := newDecompressor( + newMetrics(nil), + l, + ch1, + positionsFile, + filename, + labels, + "", + DecompressionConfig{Format: "gz"}, + func() bool { return true }, + ) + go decompressor.Run() + + select { + case logEntry := <-ch1.Chan(): + require.Contains(t, logEntry.Line, "onelinelog.log") + case <-time.After(1 * time.Second): + require.FailNow(t, "failed waiting for log line") + } + + require.EventuallyWithT(t, func(c *assert.CollectT) { + pos, err := positionsFile.Get(filename, labels.String()) + assert.NoError(c, err) + assert.Equal(c, int64(1), pos) + }, time.Second, 50*time.Millisecond) + + decompressor.Stop() + + // Run the decompressor again + go decompressor.Run() + select { + case <-ch1.Chan(): + t.Fatal("no message should be sent because of the position file") + case <-time.After(1 * time.Second): + } + + decompressor.Stop() + + positionsFile.Stop() +} + +func TestDecompressorPositionFileEntryDeleted(t *testing.T) { + defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start")) + l := util.TestLogger(t) + ch1 := loki.NewLogsReceiver() + tempDir := t.TempDir() + positionsFile, err := positions.New(l, positions.Config{ + SyncPeriod: 50 * time.Millisecond, + PositionsFile: filepath.Join(tempDir, "positions.yaml"), + IgnoreInvalidYaml: false, + ReadOnly: false, + }) + require.NoError(t, err) + filename := "testdata/onelinelog.tar.gz" + labels := model.LabelSet{ + "filename": model.LabelValue(filename), + "foo": "bar", + } + decompressor, err := newDecompressor( + newMetrics(nil), + l, + ch1, + positionsFile, + filename, + labels, + "", + DecompressionConfig{Format: "gz"}, + func() bool { return false }, + ) + go decompressor.Run() + + select { + case logEntry := <-ch1.Chan(): + require.Contains(t, logEntry.Line, "onelinelog.log") + case <-time.After(1 * time.Second): + require.FailNow(t, "failed waiting for log line") + } + + require.EventuallyWithT(t, func(c *assert.CollectT) { + pos, err := positionsFile.Get(filename, labels.String()) + assert.NoError(c, err) + assert.Equal(c, int64(1), pos) + }, time.Second, 50*time.Millisecond) + + decompressor.Stop() + + require.EventuallyWithT(t, func(c *assert.CollectT) { + pos, err := positionsFile.Get(filename, labels.String()) + assert.NoError(c, err) + assert.Equal(c, int64(0), pos) + }, time.Second, 50*time.Millisecond) + + positionsFile.Stop() +} diff --git a/internal/component/loki/source/file/file.go b/internal/component/loki/source/file/file.go index dc1222e1fd..b6e7e46e8f 100644 --- a/internal/component/loki/source/file/file.go +++ b/internal/component/loki/source/file/file.go @@ -14,9 +14,11 @@ import ( "github.com/grafana/alloy/internal/component/common/loki/positions" "github.com/grafana/alloy/internal/component/discovery" "github.com/grafana/alloy/internal/featuregate" + "github.com/grafana/alloy/internal/runner" "github.com/grafana/alloy/internal/runtime/logging/level" "github.com/grafana/tail/watch" "github.com/prometheus/common/model" + "go.uber.org/atomic" ) func init() { @@ -85,7 +87,11 @@ type Component struct { handler loki.LogsReceiver receivers []loki.LogsReceiver posFile positions.Positions - readers map[positions.Entry]reader + tasks map[positions.Entry]runnerTask + + stopping atomic.Bool + + updateReaders chan struct{} } // New creates a new loki.source.file component. @@ -113,10 +119,11 @@ func New(o component.Options, args Arguments) (*Component, error) { opts: o, metrics: newMetrics(o.Registerer), - handler: loki.NewLogsReceiver(), - receivers: args.ForwardTo, - posFile: positionsFile, - readers: make(map[positions.Entry]reader), + handler: loki.NewLogsReceiver(), + receivers: args.ForwardTo, + posFile: positionsFile, + tasks: make(map[positions.Entry]runnerTask), + updateReaders: make(chan struct{}, 1), } // Call to Update() to start readers and set receivers once at the start. @@ -128,16 +135,17 @@ func New(o component.Options, args Arguments) (*Component, error) { } // Run implements component.Component. -// TODO(@tpaschalis). Should we periodically re-check? What happens if a target -// comes alive _after_ it's been passed to us and we never receive another -// Update()? Or should it be a responsibility of the discovery component? func (c *Component) Run(ctx context.Context) error { + runner := runner.New(func(t *runnerTask) runner.Worker { + return &runnerReader{ + reader: t.reader, + } + }) defer func() { level.Info(c.opts.Logger).Log("msg", "loki.source.file component shutting down, stopping readers and positions file") c.mut.RLock() - for _, r := range c.readers { - r.Stop() - } + c.stopping.Store(true) + runner.Stop() c.posFile.Stop() close(c.handler.Chan()) c.mut.RUnlock() @@ -153,6 +161,14 @@ func (c *Component) Run(ctx context.Context) error { receiver.Chan() <- entry } c.mut.RUnlock() + case <-c.updateReaders: + c.mut.Lock() + var tasks []*runnerTask + for _, entry := range c.tasks { + tasks = append(tasks, &entry) + } + runner.ApplyTasks(ctx, tasks) + c.mut.Unlock() } } } @@ -162,28 +178,6 @@ func (c *Component) Update(args component.Arguments) error { c.updateMut.Lock() defer c.updateMut.Unlock() - // Stop all readers so we can recreate them below. This *must* be done before - // c.mut is held to avoid a race condition where stopping a reader is - // flushing its data, but the flush never succeeds because the Run goroutine - // fails to get a read lock. - // - // Stopping the readers avoids the issue we saw with stranded wrapped - // handlers staying behind until they were GC'ed and sending duplicate - // message to the global handler. It also makes sure that we update - // everything with the new labels. Simply zeroing out the c.readers map did - // not work correctly to shut down the wrapped handlers in time. - // - // TODO (@tpaschalis) We should be able to optimize this somehow and eg. - // cache readers for paths we already know about, and whose labels have not - // changed. Once we do that we should: - // - // * Call to c.pruneStoppedReaders to give cached but errored readers a - // chance to restart. - // * Stop tailing any files that were no longer in the new targets - // and conditionally remove their readers only by calling toStopTailing - // and c.stopTailingAndRemovePosition. - oldPaths := c.stopReaders() - newArgs := args.(Arguments) c.mut.Lock() @@ -191,11 +185,10 @@ func (c *Component) Update(args component.Arguments) error { c.args = newArgs c.receivers = newArgs.ForwardTo - c.readers = make(map[positions.Entry]reader) + c.tasks = make(map[positions.Entry]runnerTask) if len(newArgs.Targets) == 0 { level.Debug(c.opts.Logger).Log("msg", "no files targets were passed, nothing will be tailed") - return nil } for _, target := range newArgs.Targets { @@ -211,72 +204,47 @@ func (c *Component) Update(args component.Arguments) error { // Deduplicate targets which have the same public label set. readersKey := positions.Entry{Path: path, Labels: labels.String()} - if _, exist := c.readers[readersKey]; exist { + if _, exist := c.tasks[readersKey]; exist { continue } - c.reportSize(path, labels.String()) + c.reportSize(path) - handler := loki.AddLabelsMiddleware(labels).Wrap(loki.NewEntryHandler(c.handler.Chan(), func() {})) - reader, err := c.startTailing(path, labels, handler) + reader, err := c.createReader(path, labels) if err != nil { continue } - c.readers[readersKey] = readerWithHandler{ - reader: reader, - handler: handler, + c.tasks[readersKey] = runnerTask{ + reader: reader, + path: path, + labels: labels.String(), + // TODO: Could fastFingerPrint work? + readerHash: uint64(labels.Merge(model.LabelSet{filenameLabel: model.LabelValue(path)}).Fingerprint()), } } - // Remove from the positions file any entries that had a Reader before, but - // are no longer in the updated set of Targets. - for r := range missing(c.readers, oldPaths) { - c.posFile.Remove(r.Path, r.Labels) + select { + case c.updateReaders <- struct{}{}: + default: } return nil } -// readerWithHandler combines a reader with an entry handler associated with -// it. Closing the reader will also close the handler. -type readerWithHandler struct { - reader - handler loki.EntryHandler -} - -func (r readerWithHandler) Stop() { - r.reader.Stop() - r.handler.Stop() -} - -// stopReaders stops existing readers and returns the set of paths which were -// stopped. -func (c *Component) stopReaders() map[positions.Entry]struct{} { - c.mut.RLock() - defer c.mut.RUnlock() - - stoppedPaths := make(map[positions.Entry]struct{}, len(c.readers)) - - for p, r := range c.readers { - stoppedPaths[p] = struct{}{} - r.Stop() - } - - return stoppedPaths -} - // DebugInfo returns information about the status of tailed targets. // TODO(@tpaschalis) Decorate with more debug information once it's made // available, such as the last time a log line was read. func (c *Component) DebugInfo() interface{} { + c.mut.Lock() + defer c.mut.Unlock() var res readerDebugInfo - for e, reader := range c.readers { + for e, task := range c.tasks { offset, _ := c.posFile.Get(e.Path, e.Labels) res.TargetsInfo = append(res.TargetsInfo, targetInfo{ Path: e.Path, Labels: e.Labels, - IsRunning: reader.IsRunning(), + IsRunning: task.reader.IsRunning(), ReadOffset: offset, }) } @@ -305,10 +273,9 @@ func missing(as map[positions.Entry]reader, bs map[positions.Entry]struct{}) map return c } -// startTailing starts and returns a reader for the given path. For most files, -// this will be a tailer implementation. If the file suffix alludes to it being -// a compressed file, then a decompressor will be started instead. -func (c *Component) startTailing(path string, labels model.LabelSet, handler loki.EntryHandler) (reader, error) { +// For most files, createReader returns a tailer implementation. If the file suffix alludes to it being +// a compressed file, then a decompressor will be created instead. +func (c *Component) createReader(path string, labels model.LabelSet) (reader, error) { fi, err := os.Stat(path) if err != nil { level.Error(c.opts.Logger).Log("msg", "failed to tail file, stat failed", "error", err, "filename", path) @@ -324,24 +291,23 @@ func (c *Component) startTailing(path string, labels model.LabelSet, handler lok var reader reader if c.args.DecompressionConfig.Enabled { - level.Debug(c.opts.Logger).Log("msg", "reading from compressed file", "filename", path) decompressor, err := newDecompressor( c.metrics, c.opts.Logger, - handler, + c.handler, c.posFile, path, - labels.String(), + labels, c.args.Encoding, c.args.DecompressionConfig, + c.IsStopping, ) if err != nil { - level.Error(c.opts.Logger).Log("msg", "failed to start decompressor", "error", err, "filename", path) - return nil, fmt.Errorf("failed to start decompressor %s", err) + level.Error(c.opts.Logger).Log("msg", "failed to create decompressor", "error", err, "filename", path) + return nil, fmt.Errorf("failed to create decompressor %s", err) } reader = decompressor } else { - level.Debug(c.opts.Logger).Log("msg", "tailing new file", "filename", path) pollOptions := watch.PollingFileWatcherOptions{ MinPollFrequency: c.args.FileWatch.MinPollFrequency, MaxPollFrequency: c.args.FileWatch.MaxPollFrequency, @@ -349,17 +315,18 @@ func (c *Component) startTailing(path string, labels model.LabelSet, handler lok tailer, err := newTailer( c.metrics, c.opts.Logger, - handler, + c.handler, c.posFile, path, - labels.String(), + labels, c.args.Encoding, pollOptions, c.args.TailFromEnd, + c.IsStopping, ) if err != nil { - level.Error(c.opts.Logger).Log("msg", "failed to start tailer", "error", err, "filename", path) - return nil, fmt.Errorf("failed to start tailer %s", err) + level.Error(c.opts.Logger).Log("msg", "failed to create tailer", "error", err, "filename", path) + return nil, fmt.Errorf("failed to create tailer %s", err) } reader = tailer } @@ -367,21 +334,14 @@ func (c *Component) startTailing(path string, labels model.LabelSet, handler lok return reader, nil } -func (c *Component) reportSize(path, labels string) { - // Ask the reader to update the size if a reader exists, this keeps - // position and size metrics in sync. - if reader, ok := c.readers[positions.Entry{Path: path, Labels: labels}]; ok { - err := reader.MarkPositionAndSize() - if err != nil { - level.Warn(c.opts.Logger).Log("msg", "failed to get file size from existing reader, ", "file", path, "error", err) - return - } - } else { - // Must be a new file, just directly read the size of it - fi, err := os.Stat(path) - if err != nil { - return - } - c.metrics.totalBytes.WithLabelValues(path).Set(float64(fi.Size())) +func (c *Component) IsStopping() bool { + return c.stopping.Load() +} + +func (c *Component) reportSize(path string) { + fi, err := os.Stat(path) + if err != nil { + return } + c.metrics.totalBytes.WithLabelValues(path).Set(float64(fi.Size())) } diff --git a/internal/component/loki/source/file/file_test.go b/internal/component/loki/source/file/file_test.go index fcd418b6f1..643f2d915c 100644 --- a/internal/component/loki/source/file/file_test.go +++ b/internal/component/loki/source/file/file_test.go @@ -314,3 +314,69 @@ func TestEncoding(t *testing.T) { "expected positions.yml file to be written eventually", ) } + +func TestDeleteRecreateFile(t *testing.T) { + defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start")) + + filename := "example" + + ctx, cancel := context.WithCancel(componenttest.TestContext(t)) + defer cancel() + + // Create file to log to. + f, err := os.Create(filename) + require.NoError(t, err) + + ctrl, err := componenttest.NewControllerFromID(util.TestLogger(t), "loki.source.file") + require.NoError(t, err) + + ch1 := loki.NewLogsReceiver() + + go func() { + err := ctrl.Run(ctx, Arguments{ + Targets: []discovery.Target{{ + "__path__": f.Name(), + "foo": "bar", + }}, + ForwardTo: []loki.LogsReceiver{ch1}, + }) + require.NoError(t, err) + }() + + ctrl.WaitRunning(time.Minute) + + _, err = f.Write([]byte("writing some text\n")) + require.NoError(t, err) + + wantLabelSet := model.LabelSet{ + "filename": model.LabelValue(f.Name()), + "foo": "bar", + } + + checkMsg(t, ch1, "writing some text", 5*time.Second, wantLabelSet) + + require.NoError(t, f.Close()) + require.NoError(t, os.Remove(f.Name())) + + // Create a file with the same name + f, err = os.Create(filename) + require.NoError(t, err) + defer os.Remove(f.Name()) + defer f.Close() + + _, err = f.Write([]byte("writing some new text\n")) + require.NoError(t, err) + + checkMsg(t, ch1, "writing some new text", 5*time.Second, wantLabelSet) +} + +func checkMsg(t *testing.T, ch loki.LogsReceiver, msg string, timeout time.Duration, labelSet model.LabelSet) { + select { + case logEntry := <-ch.Chan(): + require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second) + require.Equal(t, msg, logEntry.Line) + require.Equal(t, labelSet, logEntry.Labels) + case <-time.After(timeout): + require.FailNow(t, "failed waiting for log line") + } +} diff --git a/internal/component/loki/source/file/reader.go b/internal/component/loki/source/file/reader.go index 04016dc6c1..6e1703cd3a 100644 --- a/internal/component/loki/source/file/reader.go +++ b/internal/component/loki/source/file/reader.go @@ -5,8 +5,8 @@ package file // reader contains the set of methods the loki.source.file component uses. type reader interface { + Run() Stop() IsRunning() bool Path() string - MarkPositionAndSize() error } diff --git a/internal/component/loki/source/file/runner.go b/internal/component/loki/source/file/runner.go new file mode 100644 index 0000000000..6877504246 --- /dev/null +++ b/internal/component/loki/source/file/runner.go @@ -0,0 +1,73 @@ +package file + +import ( + "context" + "sync" + "time" + + "github.com/grafana/alloy/internal/runner" + "github.com/grafana/dskit/backoff" +) + +var _ runner.Task = (*runnerTask)(nil) + +type runnerTask struct { + reader reader + path string + labels string + readerHash uint64 +} + +func (r *runnerTask) Hash() uint64 { + return r.readerHash +} + +func (r *runnerTask) Equals(other runner.Task) bool { + otherTask := other.(*runnerTask) + + if r == otherTask { + return true + } + + return r.readerHash == otherTask.readerHash +} + +// runnerReader is a wrapper around a reader (tailer or decompressor) +// It is responsible for running the reader. If the reader stops running, +// it will retry it after a few seconds. This is useful to handle log file rotation +// when a file might be gone for a very short amount of time. +// The runner is only stopped when the corresponding target is gone or when the component is stopped. +type runnerReader struct { + reader reader +} + +func (r *runnerReader) Run(ctx context.Context) { + backoff := backoff.New( + ctx, + backoff.Config{ + MinBackoff: 1 * time.Second, + MaxBackoff: 10 * time.Second, + MaxRetries: 0, + }, + ) + + var wg sync.WaitGroup + wg.Add(1) + + go func() { + defer wg.Done() + <-ctx.Done() + r.reader.Stop() + }() + + for { + r.reader.Run() + backoff.Wait() + if !backoff.Ongoing() { + break + } + } + + // Wait for the stop function to exit to ensure that the reader was properly stopped. + wg.Wait() +} diff --git a/internal/component/loki/source/file/tailer.go b/internal/component/loki/source/file/tailer.go index d925d2ec5c..3afc51c1db 100644 --- a/internal/component/loki/source/file/tailer.go +++ b/internal/component/loki/source/file/tailer.go @@ -1,6 +1,6 @@ package file -// This code is copied from loki/promtail@a8d5815510bd959a6dd8c176a5d9fd9bbfc8f8b5. +// This code is adapted from loki/promtail. Last revision used to port changes to Alloy was a8d5815510bd959a6dd8c176a5d9fd9bbfc8f8b5. // tailer implements the reader interface by using the github.com/grafana/tail package to tail files. import ( @@ -31,81 +31,50 @@ import ( type tailer struct { metrics *metrics logger log.Logger - handler loki.EntryHandler + receiver loki.LogsReceiver positions positions.Positions - path string - labels string - tail *tail.Tail + path string + labelsStr string + labels model.LabelSet + + tailFromEnd bool + pollOptions watch.PollingFileWatcherOptions posAndSizeMtx sync.Mutex - stopOnce sync.Once running *atomic.Bool - posquit chan struct{} - posdone chan struct{} - done chan struct{} - decoder *encoding.Decoder -} - -func newTailer(metrics *metrics, logger log.Logger, handler loki.EntryHandler, positions positions.Positions, path string, - labels string, encoding string, pollOptions watch.PollingFileWatcherOptions, tailFromEnd bool) (*tailer, error) { - // Simple check to make sure the file we are tailing doesn't - // have a position already saved which is past the end of the file. - fi, err := os.Stat(path) - if err != nil { - return nil, err - } - pos, err := positions.Get(path, labels) - if err != nil { - return nil, err - } + componentStopping func() bool - if fi.Size() < pos { - positions.Remove(path, labels) - } + mut sync.RWMutex + stopping bool + tail *tail.Tail + posquit chan struct{} // used by the readLine method to tell the updatePosition method to stop + posdone chan struct{} // used by the updatePosition method to notify when it stopped + done chan struct{} // used by the readLine method to notify when it stopped - // If no cached position is found and the tailFromEnd option is enabled. - if pos == 0 && tailFromEnd { - pos, err = getLastLinePosition(path) - if err != nil { - level.Error(logger).Log("msg", "failed to get a position from the end of the file, default to start of file", err) - } else { - positions.Put(path, labels, pos) - level.Info(logger).Log("msg", "retrieved and stored the position of the last line") - } - } + decoder *encoding.Decoder +} - tail, err := tail.TailFile(path, tail.Config{ - Follow: true, - Poll: true, - ReOpen: true, - MustExist: true, - Location: &tail.SeekInfo{ - Offset: pos, - Whence: 0, - }, - Logger: util.NewLogAdapter(logger), - PollOptions: pollOptions, - }) - if err != nil { - return nil, err - } +func newTailer(metrics *metrics, logger log.Logger, receiver loki.LogsReceiver, positions positions.Positions, path string, + labels model.LabelSet, encoding string, pollOptions watch.PollingFileWatcherOptions, tailFromEnd bool, componentStopping func() bool) (*tailer, error) { - logger = log.With(logger, "component", "tailer") tailer := &tailer{ - metrics: metrics, - logger: logger, - handler: loki.AddLabelsMiddleware(model.LabelSet{filenameLabel: model.LabelValue(path)}).Wrap(handler), - positions: positions, - path: path, - labels: labels, - tail: tail, - running: atomic.NewBool(false), - posquit: make(chan struct{}), - posdone: make(chan struct{}), - done: make(chan struct{}), + metrics: metrics, + logger: log.With(logger, "component", "tailer"), + receiver: receiver, + positions: positions, + path: path, + labels: labels, + labelsStr: labels.String(), + running: atomic.NewBool(false), + tailFromEnd: tailFromEnd, + pollOptions: pollOptions, + posquit: make(chan struct{}), + posdone: make(chan struct{}), + done: make(chan struct{}), + componentStopping: componentStopping, } if encoding != "" { @@ -118,9 +87,6 @@ func newTailer(metrics *metrics, logger log.Logger, handler loki.EntryHandler, p tailer.decoder = decoder } - go tailer.readLines() - go tailer.updatePosition() - metrics.filesActive.Add(1.) return tailer, nil } @@ -180,10 +146,84 @@ func getLastLinePosition(path string) (int64, error) { } } +func (t *tailer) Run() { + t.mut.Lock() + + // Check if the stop function was called between two Run. + if t.stopping { + close(t.done) + close(t.posdone) + close(t.posquit) + t.mut.Unlock() + return + } + + fi, err := os.Stat(t.path) + if err != nil { + level.Error(t.logger).Log("msg", "failed to tail file", "path", t.path, "err", err) + return + } + pos, err := t.positions.Get(t.path, t.labelsStr) + if err != nil { + level.Error(t.logger).Log("msg", "failed to get file position", "err", err) + return + } + + // NOTE: The code assumes that if a position is available and that the file is bigger than the position, then + // the tail should start from the position. This may not be always desired in situation where the file was rotated + // with a file that has the same name but different content and a bigger size that the previous one. This problem would + // mostly show up on Windows because on Unix systems, the readlines function is not exited on file rotation. + // If this ever becomes a problem, we may want to consider saving and comparing file creation timestamps. + if fi.Size() < pos { + t.positions.Remove(t.path, t.labelsStr) + } + + // If no cached position is found and the tailFromEnd option is enabled. + if pos == 0 && t.tailFromEnd { + pos, err = getLastLinePosition(t.path) + if err != nil { + level.Error(t.logger).Log("msg", "failed to get a position from the end of the file, default to start of file", err) + } else { + t.positions.Put(t.path, t.labelsStr, pos) + level.Info(t.logger).Log("msg", "retrieved and stored the position of the last line") + } + } + labelsMiddleware := t.labels.Merge(model.LabelSet{filenameLabel: model.LabelValue(t.path)}) + handler := loki.AddLabelsMiddleware(labelsMiddleware).Wrap(loki.NewEntryHandler(t.receiver.Chan(), func() {})) + defer handler.Stop() + + tail, err := tail.TailFile(t.path, tail.Config{ + Follow: true, + Poll: true, + ReOpen: true, + MustExist: true, + Location: &tail.SeekInfo{ + Offset: pos, + Whence: 0, + }, + Logger: util.NewLogAdapter(t.logger), + PollOptions: t.pollOptions, + }) + if err != nil { + level.Error(t.logger).Log("msg", "failed to tail the file", "err", err) + return + } + t.tail = tail + + t.posquit = make(chan struct{}) + t.posdone = make(chan struct{}) + t.done = make(chan struct{}) + t.mut.Unlock() + + go t.updatePosition() + t.metrics.filesActive.Add(1.) + t.readLines(handler) +} + // updatePosition is run in a goroutine and checks the current size of the file // and saves it to the positions file at a regular interval. If there is ever // an error it stops the tailer and exits, the tailer will be re-opened by the -// filetarget sync method if it still exists and will start reading from the +// backoff retry method if it still exists and will start reading from the // last successful entry in the positions file. func (t *tailer) updatePosition() { positionSyncPeriod := t.positions.SyncPeriod() @@ -191,7 +231,7 @@ func (t *tailer) updatePosition() { defer func() { positionWait.Stop() level.Info(t.logger).Log("msg", "position timer: exited", "path", t.path) - // NOTE: metrics must be cleaned up after the position timer exits, as MarkPositionAndSize() updates metrics. + // NOTE: metrics must be cleaned up after the position timer exits, as markPositionAndSize() updates metrics. t.cleanupMetrics() close(t.posdone) }() @@ -199,7 +239,7 @@ func (t *tailer) updatePosition() { for { select { case <-positionWait.C: - err := t.MarkPositionAndSize() + err := t.markPositionAndSize() if err != nil { level.Error(t.logger).Log("msg", "position timer: error getting tail position and/or size, stopping tailer", "path", t.path, "error", err) err := t.tail.Stop() @@ -214,19 +254,17 @@ func (t *tailer) updatePosition() { } } -// readLines runs in a goroutine and consumes the t.tail.Lines channel from the -// underlying tailer. Et will only exit when that channel is closed. This is +// readLines consumes the t.tail.Lines channel from the +// underlying tailer. It will only exit when that channel is closed. This is // important to avoid a deadlock in the underlying tailer which can happen if // there are unread lines in this channel and the Stop method on the tailer is // called, the underlying tailer will never exit if there are unread lines in // the t.tail.Lines channel -func (t *tailer) readLines() { +func (t *tailer) readLines(handler loki.EntryHandler) { level.Info(t.logger).Log("msg", "tail routine: started", "path", t.path) t.running.Store(true) - // This function runs in a goroutine, if it exits this tailer will never do any more tailing. - // Clean everything up. defer func() { t.running.Store(false) level.Info(t.logger).Log("msg", "tail routine: exited", "path", t.path) @@ -234,7 +272,7 @@ func (t *tailer) readLines() { // Shut down the position marker thread close(t.posquit) }() - entries := t.handler.Chan() + entries := handler.Chan() for { line, ok := <-t.tail.Lines if !ok { @@ -272,8 +310,8 @@ func (t *tailer) readLines() { } } -func (t *tailer) MarkPositionAndSize() error { - // Lock this update as there are 2 timers calling this routine, the sync in filetarget and the positions sync in this file. +func (t *tailer) markPositionAndSize() error { + // Lock this update because it can be called in two different goroutines t.posAndSizeMtx.Lock() defer t.posAndSizeMtx.Unlock() @@ -295,40 +333,51 @@ func (t *tailer) MarkPositionAndSize() error { // Update metrics and positions file all together to avoid race conditions when `t.tail` is stopped. t.metrics.totalBytes.WithLabelValues(t.path).Set(float64(size)) t.metrics.readBytes.WithLabelValues(t.path).Set(float64(pos)) - t.positions.Put(t.path, t.labels, pos) + t.positions.Put(t.path, t.labelsStr, pos) return nil } func (t *tailer) Stop() { - // stop can be called by two separate threads in filetarget, to avoid a panic closing channels more than once - // we wrap the stop in a sync.Once. - t.stopOnce.Do(func() { - // Save the current position before shutting down tailer - err := t.MarkPositionAndSize() - if err != nil { - level.Error(t.logger).Log("msg", "error marking file position when stopping tailer", "path", t.path, "error", err) - } + t.mut.RLock() + t.stopping = true + defer func() { + t.stopping = false + }() + + // Save the current position before shutting down tailer + err := t.markPositionAndSize() + if err != nil { + level.Error(t.logger).Log("msg", "error marking file position when stopping tailer", "path", t.path, "error", err) + } - // Stop the underlying tailer to prevent resource leak. + // Stop the underlying tailer to prevent resource leak. + if t.tail != nil { err = t.tail.Stop() - if err != nil { - if utils.IsEphemeralOrFileClosed(err) { - // Don't log as error if the file is already closed, or we got an ephemeral error - it's a common case - // when files are rotating while being read and the tailer would have stopped correctly anyway. - level.Debug(t.logger).Log("msg", "tailer stopped with file I/O error", "path", t.path, "error", err) - } else { - // Log as error for other reasons, as a resource leak may have happened. - level.Error(t.logger).Log("msg", "error stopping tailer", "path", t.path, "error", err) - } + } + t.mut.RUnlock() + + if err != nil { + if utils.IsEphemeralOrFileClosed(err) { + // Don't log as error if the file is already closed, or we got an ephemeral error - it's a common case + // when files are rotating while being read and the tailer would have stopped correctly anyway. + level.Debug(t.logger).Log("msg", "tailer stopped with file I/O error", "path", t.path, "error", err) + } else { + // Log as error for other reasons, as a resource leak may have happened. + level.Error(t.logger).Log("msg", "error stopping tailer", "path", t.path, "error", err) } - // Wait for readLines() to consume all the remaining messages and exit when the channel is closed - <-t.done - // Wait for the position marker thread to exit - <-t.posdone - level.Info(t.logger).Log("msg", "stopped tailing file", "path", t.path) - t.handler.Stop() - }) + } + // Wait for readLines() to consume all the remaining messages and exit when the channel is closed + <-t.done + // Wait for the position marker thread to exit + <-t.posdone + level.Info(t.logger).Log("msg", "stopped tailing file", "path", t.path) + + // If the component is not stopping, then it means that the target for this component is gone and that + // we should clear the entry from the positions file. + if !t.componentStopping() { + t.positions.Remove(t.path, t.labelsStr) + } } func (t *tailer) IsRunning() bool { diff --git a/internal/component/loki/source/file/tailer_test.go b/internal/component/loki/source/file/tailer_test.go index b023ea631f..397ca3ddf5 100644 --- a/internal/component/loki/source/file/tailer_test.go +++ b/internal/component/loki/source/file/tailer_test.go @@ -3,7 +3,18 @@ package file import ( "bytes" "os" + "path/filepath" "testing" + "time" + + "github.com/grafana/alloy/internal/component/common/loki" + "github.com/grafana/alloy/internal/component/common/loki/positions" + "github.com/grafana/alloy/internal/util" + "github.com/grafana/tail/watch" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/goleak" ) func createTempFileWithContent(t *testing.T, content []byte) string { @@ -78,3 +89,147 @@ func TestGetLastLinePosition(t *testing.T) { }) } } + +func TestTailer(t *testing.T) { + defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start")) + l := util.TestLogger(t) + ch1 := loki.NewLogsReceiver() + tempDir := t.TempDir() + logFile, err := os.CreateTemp(tempDir, "example") + require.NoError(t, err) + positionsFile, err := positions.New(l, positions.Config{ + SyncPeriod: 50 * time.Millisecond, + PositionsFile: filepath.Join(tempDir, "positions.yaml"), + IgnoreInvalidYaml: false, + ReadOnly: false, + }) + require.NoError(t, err) + labels := model.LabelSet{ + "filename": model.LabelValue(logFile.Name()), + "foo": "bar", + } + tailer, err := newTailer( + newMetrics(nil), + l, + ch1, + positionsFile, + logFile.Name(), + labels, + "", + watch.PollingFileWatcherOptions{ + MinPollFrequency: 25 * time.Millisecond, + MaxPollFrequency: 25 * time.Millisecond, + }, + false, + func() bool { return true }, + ) + go tailer.Run() + + _, err = logFile.Write([]byte("writing some text\n")) + require.NoError(t, err) + select { + case logEntry := <-ch1.Chan(): + require.Equal(t, "writing some text", logEntry.Line) + case <-time.After(1 * time.Second): + require.FailNow(t, "failed waiting for log line") + } + + require.EventuallyWithT(t, func(c *assert.CollectT) { + pos, err := positionsFile.Get(logFile.Name(), labels.String()) + assert.NoError(c, err) + assert.Equal(c, int64(18), pos) + }, time.Second, 50*time.Millisecond) + + tailer.Stop() + + // Run the tailer again + go tailer.Run() + select { + case <-ch1.Chan(): + t.Fatal("no message should be sent because of the position file") + case <-time.After(1 * time.Second): + } + + // Write logs again + _, err = logFile.Write([]byte("writing some new text\n")) + require.NoError(t, err) + select { + case logEntry := <-ch1.Chan(): + require.Equal(t, "writing some new text", logEntry.Line) + case <-time.After(1 * time.Second): + require.FailNow(t, "failed waiting for log line") + } + + tailer.Stop() + + require.EventuallyWithT(t, func(c *assert.CollectT) { + pos, err := positionsFile.Get(logFile.Name(), labels.String()) + assert.NoError(c, err) + assert.Equal(c, int64(40), pos) + }, time.Second, 50*time.Millisecond) + + positionsFile.Stop() + require.NoError(t, logFile.Close()) +} + +func TestTailerPositionFileEntryDeleted(t *testing.T) { + defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start")) + l := util.TestLogger(t) + ch1 := loki.NewLogsReceiver() + tempDir := t.TempDir() + logFile, err := os.CreateTemp(tempDir, "example") + require.NoError(t, err) + positionsFile, err := positions.New(l, positions.Config{ + SyncPeriod: 50 * time.Millisecond, + PositionsFile: filepath.Join(tempDir, "positions.yaml"), + IgnoreInvalidYaml: false, + ReadOnly: false, + }) + require.NoError(t, err) + labels := model.LabelSet{ + "filename": model.LabelValue(logFile.Name()), + "foo": "bar", + } + tailer, err := newTailer( + newMetrics(nil), + l, + ch1, + positionsFile, + logFile.Name(), + labels, + "", + watch.PollingFileWatcherOptions{ + MinPollFrequency: 25 * time.Millisecond, + MaxPollFrequency: 25 * time.Millisecond, + }, + false, + func() bool { return false }, + ) + go tailer.Run() + + _, err = logFile.Write([]byte("writing some text\n")) + require.NoError(t, err) + select { + case logEntry := <-ch1.Chan(): + require.Equal(t, "writing some text", logEntry.Line) + case <-time.After(1 * time.Second): + require.FailNow(t, "failed waiting for log line") + } + + require.EventuallyWithT(t, func(c *assert.CollectT) { + pos, err := positionsFile.Get(logFile.Name(), labels.String()) + assert.NoError(c, err) + assert.Equal(c, int64(18), pos) + }, time.Second, 50*time.Millisecond) + + tailer.Stop() + + require.EventuallyWithT(t, func(c *assert.CollectT) { + pos, err := positionsFile.Get(logFile.Name(), labels.String()) + assert.NoError(c, err) + assert.Equal(c, int64(0), pos) + }, time.Second, 50*time.Millisecond) + + positionsFile.Stop() + require.NoError(t, logFile.Close()) +}