Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use runner pkg in loki source file #2428

Merged
merged 13 commits into from
Feb 4, 2025
Prev Previous commit
Next Next commit
delete positions entry when the target is gone
wildum committed Jan 23, 2025
commit 85c92d683e2687691d5aea34c20149d030db8958
44 changes: 27 additions & 17 deletions internal/component/loki/source/file/decompresser.go
Original file line number Diff line number Diff line change
@@ -60,6 +60,8 @@ type decompressor struct {
size int64
cfg DecompressionConfig

componentStopping func() bool

mut sync.RWMutex
stopping bool
posquit chan struct{} // used by the readLine method to tell the updatePosition method to stop
@@ -76,6 +78,7 @@ func newDecompressor(
labels model.LabelSet,
encodingFormat string,
cfg DecompressionConfig,
componentStopping func() bool,
) (*decompressor, error) {

labelsStr := labels.String()
@@ -98,20 +101,21 @@ func newDecompressor(
}

decompressor := &decompressor{
metrics: metrics,
logger: logger,
receiver: receiver,
positions: positions,
path: path,
labels: labels,
labelsStr: labelsStr,
running: atomic.NewBool(false),
posquit: make(chan struct{}),
posdone: make(chan struct{}),
done: make(chan struct{}),
position: pos,
decoder: decoder,
cfg: cfg,
metrics: metrics,
logger: logger,
receiver: receiver,
positions: positions,
path: path,
labels: labels,
labelsStr: labelsStr,
running: atomic.NewBool(false),
posquit: make(chan struct{}),
posdone: make(chan struct{}),
done: make(chan struct{}),
position: pos,
decoder: decoder,
cfg: cfg,
componentStopping: componentStopping,
}

return decompressor, nil
@@ -322,9 +326,15 @@ func (d *decompressor) Stop() {
<-d.done
level.Info(d.logger).Log("msg", "stopped decompressor", "path", d.path)

// Save the current position before shutting down reader
if err := d.markPositionAndSize(); err != nil {
level.Error(d.logger).Log("msg", "error marking file position when stopping decompressor", "path", d.path, "error", err)
// If the component is not stopping, then it means that the target for this component is gone and that
// we should clear the entry from the positions file.
if !d.componentStopping() {
d.positions.Remove(d.path, d.labelsStr)
} else {
// Save the current position before shutting down reader
if err := d.markPositionAndSize(); err != nil {
level.Error(d.logger).Log("msg", "error marking file position when stopping decompressor", "path", d.path, "error", err)
}
}
}

55 changes: 55 additions & 0 deletions internal/component/loki/source/file/decompresser_test.go
Original file line number Diff line number Diff line change
@@ -221,6 +221,7 @@ func TestDecompressor(t *testing.T) {
labels,
"",
DecompressionConfig{Format: "gz"},
func() bool { return true },
)
go decompressor.Run()

@@ -251,3 +252,57 @@ func TestDecompressor(t *testing.T) {

positionsFile.Stop()
}

func TestDecompressorPositionFileEntryDeleted(t *testing.T) {
defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"))
l := util.TestLogger(t)
ch1 := loki.NewLogsReceiver()
tempDir := t.TempDir()
positionsFile, err := positions.New(l, positions.Config{
SyncPeriod: 50 * time.Millisecond,
PositionsFile: filepath.Join(tempDir, "positions.yaml"),
IgnoreInvalidYaml: false,
ReadOnly: false,
})
require.NoError(t, err)
filename := "testdata/onelinelog.tar.gz"
labels := model.LabelSet{
"filename": model.LabelValue(filename),
"foo": "bar",
}
decompressor, err := newDecompressor(
newMetrics(nil),
l,
ch1,
positionsFile,
filename,
labels,
"",
DecompressionConfig{Format: "gz"},
func() bool { return false },
)
go decompressor.Run()

select {
case logEntry := <-ch1.Chan():
require.Contains(t, logEntry.Line, "onelinelog.log")
case <-time.After(1 * time.Second):
require.FailNow(t, "failed waiting for log line")
}

require.EventuallyWithT(t, func(c *assert.CollectT) {
pos, err := positionsFile.Get(filename, labels.String())
assert.NoError(c, err)
assert.Equal(c, int64(1), pos)
}, time.Second, 50*time.Millisecond)

decompressor.Stop()

require.EventuallyWithT(t, func(c *assert.CollectT) {
pos, err := positionsFile.Get(filename, labels.String())
assert.NoError(c, err)
assert.Equal(c, int64(0), pos)
}, time.Second, 50*time.Millisecond)

positionsFile.Stop()
}
10 changes: 10 additions & 0 deletions internal/component/loki/source/file/file.go
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@ import (
"github.com/grafana/alloy/internal/runtime/logging/level"
"github.com/grafana/tail/watch"
"github.com/prometheus/common/model"
"go.uber.org/atomic"
)

func init() {
@@ -88,6 +89,8 @@ type Component struct {
posFile positions.Positions
tasks map[positions.Entry]runnerTask

stopping atomic.Bool

updateReaders chan struct{}
}

@@ -141,6 +144,7 @@ func (c *Component) Run(ctx context.Context) error {
defer func() {
level.Info(c.opts.Logger).Log("msg", "loki.source.file component shutting down, stopping readers and positions file")
c.mut.RLock()
c.stopping.Store(true)
runner.Stop()
c.posFile.Stop()
close(c.handler.Chan())
@@ -296,6 +300,7 @@ func (c *Component) createReader(path string, labels model.LabelSet) (reader, er
labels,
c.args.Encoding,
c.args.DecompressionConfig,
c.IsStopping,
)
if err != nil {
level.Error(c.opts.Logger).Log("msg", "failed to create decompressor", "error", err, "filename", path)
@@ -317,6 +322,7 @@ func (c *Component) createReader(path string, labels model.LabelSet) (reader, er
c.args.Encoding,
pollOptions,
c.args.TailFromEnd,
c.IsStopping,
)
if err != nil {
level.Error(c.opts.Logger).Log("msg", "failed to create tailer", "error", err, "filename", path)
@@ -328,6 +334,10 @@ func (c *Component) createReader(path string, labels model.LabelSet) (reader, er
return reader, nil
}

func (c *Component) IsStopping() bool {
return c.stopping.Load()
}

func (c *Component) reportSize(path string) {
fi, err := os.Stat(path)
if err != nil {
37 changes: 23 additions & 14 deletions internal/component/loki/source/file/tailer.go
Original file line number Diff line number Diff line change
@@ -45,6 +45,8 @@ type tailer struct {

running *atomic.Bool

componentStopping func() bool

mut sync.RWMutex
stopping bool
tail *tail.Tail
@@ -56,22 +58,23 @@ type tailer struct {
}

func newTailer(metrics *metrics, logger log.Logger, receiver loki.LogsReceiver, positions positions.Positions, path string,
labels model.LabelSet, encoding string, pollOptions watch.PollingFileWatcherOptions, tailFromEnd bool) (*tailer, error) {
labels model.LabelSet, encoding string, pollOptions watch.PollingFileWatcherOptions, tailFromEnd bool, componentStopping func() bool) (*tailer, error) {

tailer := &tailer{
metrics: metrics,
logger: log.With(logger, "component", "tailer"),
receiver: receiver,
positions: positions,
path: path,
labels: labels,
labelsStr: labels.String(),
running: atomic.NewBool(false),
tailFromEnd: tailFromEnd,
pollOptions: pollOptions,
posquit: make(chan struct{}),
posdone: make(chan struct{}),
done: make(chan struct{}),
metrics: metrics,
logger: log.With(logger, "component", "tailer"),
receiver: receiver,
positions: positions,
path: path,
labels: labels,
labelsStr: labels.String(),
running: atomic.NewBool(false),
tailFromEnd: tailFromEnd,
pollOptions: pollOptions,
posquit: make(chan struct{}),
posdone: make(chan struct{}),
done: make(chan struct{}),
componentStopping: componentStopping,
}

if encoding != "" {
@@ -369,6 +372,12 @@ func (t *tailer) Stop() {
// Wait for the position marker thread to exit
<-t.posdone
level.Info(t.logger).Log("msg", "stopped tailing file", "path", t.path)

// If the component is not stopping, then it means that the target for this component is gone and that
// we should clear the entry from the positions file.
if !t.componentStopping() {
t.positions.Remove(t.path, t.labelsStr)
}
}

func (t *tailer) IsRunning() bool {
62 changes: 62 additions & 0 deletions internal/component/loki/source/file/tailer_test.go
Original file line number Diff line number Diff line change
@@ -121,6 +121,7 @@ func TestTailer(t *testing.T) {
MaxPollFrequency: 25 * time.Millisecond,
},
false,
func() bool { return true },
)
go tailer.Run()

@@ -169,3 +170,64 @@ func TestTailer(t *testing.T) {

positionsFile.Stop()
}

func TestTailerPositionFileEntryDeleted(t *testing.T) {
defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"))
l := util.TestLogger(t)
ch1 := loki.NewLogsReceiver()
tempDir := t.TempDir()
logFile, err := os.CreateTemp(tempDir, "example")
require.NoError(t, err)
positionsFile, err := positions.New(l, positions.Config{
SyncPeriod: 50 * time.Millisecond,
PositionsFile: filepath.Join(tempDir, "positions.yaml"),
IgnoreInvalidYaml: false,
ReadOnly: false,
})
require.NoError(t, err)
labels := model.LabelSet{
"filename": model.LabelValue(logFile.Name()),
"foo": "bar",
}
tailer, err := newTailer(
newMetrics(nil),
l,
ch1,
positionsFile,
logFile.Name(),
labels,
"",
watch.PollingFileWatcherOptions{
MinPollFrequency: 25 * time.Millisecond,
MaxPollFrequency: 25 * time.Millisecond,
},
false,
func() bool { return false },
)
go tailer.Run()

_, err = logFile.Write([]byte("writing some text\n"))
require.NoError(t, err)
select {
case logEntry := <-ch1.Chan():
require.Equal(t, "writing some text", logEntry.Line)
case <-time.After(1 * time.Second):
require.FailNow(t, "failed waiting for log line")
}

require.EventuallyWithT(t, func(c *assert.CollectT) {
pos, err := positionsFile.Get(logFile.Name(), labels.String())
assert.NoError(c, err)
assert.Equal(c, int64(18), pos)
}, time.Second, 50*time.Millisecond)

tailer.Stop()

require.EventuallyWithT(t, func(c *assert.CollectT) {
pos, err := positionsFile.Get(logFile.Name(), labels.String())
assert.NoError(c, err)
assert.Equal(c, int64(0), pos)
}, time.Second, 50*time.Millisecond)

positionsFile.Stop()
}