Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use runner pkg in loki source file #2428

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ Main (unreleased)

- Add json format support for log export via faro receiver (@ravishankar15)

### Bugfixes

- Fix log rotation for Windows in `loki.source.file` by refactoring the component to use the runner pkg. This should also reduce CPU consumption when tailing a lot of files in a dynamic environment. (@wildum)

v1.6.0-rc.1
-----------------

Expand Down
82 changes: 45 additions & 37 deletions internal/component/loki/source/file/decompresser.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package file

// This code is copied from loki/promtail@a8d5815510bd959a6dd8c176a5d9fd9bbfc8f8b5.
// This code is adapted from loki/promtail@a8d5815510bd959a6dd8c176a5d9fd9bbfc8f8b5.
// Decompressor implements the reader interface and is used to read compressed log files.
// It uses the Go stdlib's compress/* packages for decoding.

Expand Down Expand Up @@ -43,19 +43,19 @@ func supportedCompressedFormats() map[string]struct{} {
type decompressor struct {
metrics *metrics
logger log.Logger
handler loki.EntryHandler
receiver loki.LogsReceiver
positions positions.Positions

path string
labels string
path string
labels model.LabelSet
labelsStr string

posAndSizeMtx sync.Mutex
stopOnce sync.Once

running *atomic.Bool
posquit chan struct{}
posdone chan struct{}
done chan struct{}
posquit chan struct{} // used by the readLine method to tell the updatePosition method to stop
posdone chan struct{} // used by the updatePosition method to notify when it stopped
done chan struct{} // used by the readLine method to notify when it stopped

decoder *encoding.Decoder

Expand All @@ -67,17 +67,19 @@ type decompressor struct {
func newDecompressor(
metrics *metrics,
logger log.Logger,
handler loki.EntryHandler,
receiver loki.LogsReceiver,
positions positions.Positions,
path string,
labels string,
labels model.LabelSet,
encodingFormat string,
cfg DecompressionConfig,
) (*decompressor, error) {

labelsStr := labels.String()

logger = log.With(logger, "component", "decompressor")

pos, err := positions.Get(path, labels)
pos, err := positions.Get(path, labelsStr)
if err != nil {
return nil, fmt.Errorf("failed to get positions: %w", err)
}
Expand All @@ -95,10 +97,11 @@ func newDecompressor(
decompressor := &decompressor{
metrics: metrics,
logger: logger,
handler: loki.AddLabelsMiddleware(model.LabelSet{filenameLabel: model.LabelValue(path)}).Wrap(handler),
receiver: receiver,
positions: positions,
path: path,
labels: labels,
labelsStr: labelsStr,
running: atomic.NewBool(false),
posquit: make(chan struct{}),
posdone: make(chan struct{}),
Expand All @@ -108,9 +111,6 @@ func newDecompressor(
cfg: cfg,
}

go decompressor.readLines()
go decompressor.updatePosition()
metrics.filesActive.Add(1.)
return decompressor, nil
}

Expand Down Expand Up @@ -151,6 +151,18 @@ func mountReader(f *os.File, logger log.Logger, format CompressionFormat) (reade
return reader, nil
}

func (d *decompressor) Run() {
labelsMiddleware := d.labels.Merge(model.LabelSet{filenameLabel: model.LabelValue(d.path)})
handler := loki.AddLabelsMiddleware(labelsMiddleware).Wrap(loki.NewEntryHandler(d.receiver.Chan(), func() {}))
defer handler.Stop()
d.posquit = make(chan struct{})
d.posdone = make(chan struct{})
d.done = make(chan struct{})
go d.updatePosition()
d.metrics.filesActive.Add(1.)
d.readLines(handler)
}

func (d *decompressor) updatePosition() {
positionSyncPeriod := d.positions.SyncPeriod()
positionWait := time.NewTicker(positionSyncPeriod)
Expand All @@ -163,7 +175,7 @@ func (d *decompressor) updatePosition() {
for {
select {
case <-positionWait.C:
if err := d.MarkPositionAndSize(); err != nil {
if err := d.markPositionAndSize(); err != nil {
level.Error(d.logger).Log("msg", "position timer: error getting position and/or size, stopping decompressor", "path", d.path, "error", err)
return
}
Expand All @@ -178,7 +190,7 @@ func (d *decompressor) updatePosition() {
// It first decompresses the file as a whole using a reader and then it will iterate
// over its chunks, separated by '\n'.
// During each iteration, the parsed and decoded log line is then sent to the API with the current timestamp.
func (d *decompressor) readLines() {
func (d *decompressor) readLines(handler loki.EntryHandler) {
level.Info(d.logger).Log("msg", "read lines routine: started", "path", d.path)
d.running.Store(true)

Expand All @@ -188,11 +200,12 @@ func (d *decompressor) readLines() {
}

defer func() {
d.running.Store(false)
d.cleanupMetrics()
level.Info(d.logger).Log("msg", "read lines routine finished", "path", d.path)
close(d.done)
}()
entries := d.handler.Chan()
entries := handler.Chan()

f, err := os.Open(d.path)
if err != nil {
Expand Down Expand Up @@ -261,36 +274,31 @@ func (d *decompressor) readLines() {
}
}

func (d *decompressor) MarkPositionAndSize() error {
// Lock this update as there are 2 timers calling this routine, the sync in filetarget and the positions sync in this file.
func (d *decompressor) markPositionAndSize() error {
// Lock this update because it can be called in two different threads
wildum marked this conversation as resolved.
Show resolved Hide resolved
d.posAndSizeMtx.Lock()
defer d.posAndSizeMtx.Unlock()

d.metrics.totalBytes.WithLabelValues(d.path).Set(float64(d.size))
wildum marked this conversation as resolved.
Show resolved Hide resolved
d.metrics.readBytes.WithLabelValues(d.path).Set(float64(d.position))
d.positions.Put(d.path, d.labels, d.position)
d.positions.Put(d.path, d.labelsStr, d.position)

return nil
}

func (d *decompressor) Stop() {
// stop can be called by two separate threads in filetarget, to avoid a panic closing channels more than once
// we wrap the stop in a sync.Once.
d.stopOnce.Do(func() {
// Shut down the position marker thread
close(d.posquit)
<-d.posdone

// Save the current position before shutting down reader
if err := d.MarkPositionAndSize(); err != nil {
level.Error(d.logger).Log("msg", "error marking file position when stopping decompressor", "path", d.path, "error", err)
}
// Shut down the position marker thread
close(d.posquit)
<-d.posdone

// Save the current position before shutting down reader
if err := d.markPositionAndSize(); err != nil {
level.Error(d.logger).Log("msg", "error marking file position when stopping decompressor", "path", d.path, "error", err)
}

// Wait for readLines() to consume all the remaining messages and exit when the channel is closed
<-d.done
level.Info(d.logger).Log("msg", "stopped decompressor", "path", d.path)
d.handler.Stop()
})
// Wait for readLines() to consume all the remaining messages and exit when the channel is closed
wildum marked this conversation as resolved.
Show resolved Hide resolved
<-d.done
level.Info(d.logger).Log("msg", "stopped decompressor", "path", d.path)
}

func (d *decompressor) IsRunning() bool {
Expand Down
Loading
Loading