Skip to content

Commit

Permalink
Merge pull request #24 from jayjiahua/fix/log_without_newline
Browse files Browse the repository at this point in the history
fix: 修复当文件最后一行日志没有换行符会导致句柄不释放的问题
  • Loading branch information
liuwenping authored Sep 25, 2023
2 parents 4bc1fc2 + fec91a0 commit 6aece75
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 3 deletions.
11 changes: 9 additions & 2 deletions filebeat/input/log/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@ func (h *Harvester) Run() error {
removedCheckTick = time.After(h.config.ScanFrequency)
}

removedTick := make(<-chan time.Time)

L:
for {
select {
Expand All @@ -215,13 +217,16 @@ func (h *Harvester) Run() error {
case <-removedCheckTick:
// 通过旁路判断文件是否被删除,避免输出堵塞时未能执行 errorChecks 导致已删文件的 fd 没有被及时释放的问题
if h.reader.fileReader.log.fs.Removed() {
logp.Info("Closing harvester because file was removed: %s", source)
break L
logp.Info("Closing harvester because file was removed: %s, wait for 60s", source)
// 等待 60s 才停止,避免日志没来得及采完就关掉导致漏采
removedTick = time.After(60 * time.Second)
} else {
logp.Debug("harvester", "File was not removed: %s, check again after %v", source, h.config.ScanFrequency)
// update timer
removedCheckTick = time.After(h.config.ScanFrequency)
}
case <-removedTick:
break L
// Required when reader loop returns and reader finished
case <-h.done:
break L
Expand All @@ -232,6 +237,8 @@ func (h *Harvester) Run() error {

// Close reader
h.reader.Stop()

logp.Info("Harvester stopped: %s", source)
}(h.state.Source)

logp.Info("Harvester started for file: %s, offset: %d", h.state.Source, h.state.Offset)
Expand Down
49 changes: 48 additions & 1 deletion libbeat/reader/readfile/line.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,54 @@ func (r *LineReader) Next() ([]byte, int, error) {
// read next 'potential' line from input buffer/reader
err := r.advance()
if err != nil {
return nil, 0, err
if r.inBuffer.Len() == 0 || !(err.Error() == "file was removed" || err.Error() == "file inactive") {
// buffer为空,或者不属于文件被删除以及文件不活跃的错误,直接返回空
// return and reset consumed bytes count
sz := r.byteCount
r.byteCount = 0
return nil, sz, err
}

logp.Info("LineReader get an advance err: %s, send all %d bytes in buffer", err, r.inBuffer.Len())
// Found EOF and collectOnEOF is true
// -> decode input sequence into outBuffer
// let's take whole buffer len without len(nl) if it ends with it
end := r.inBuffer.Len()
if bytes.HasSuffix(r.inBuffer.Bytes(), r.nl) {
end -= len(r.nl)
}

sz, err := r.decode(end)
if err != nil {
logp.Err("Error decoding line: %s", err)
// In case of error increase size by unencoded length
sz = r.inBuffer.Len()
}

// Consume transformed bytes from input buffer
_ = r.inBuffer.Advance(sz)
r.inBuffer.Reset()

// continue scanning input buffer from last position + 1
r.inOffset = end - sz
if r.inOffset < 0 {
// fix inOffset if '\n' has encoding > 8bits + firl line has been decoded
r.inOffset = 0
}
// output buffer contains untile EOF. Extract
// byte slice from buffer and reset output buffer.
bytes, err := r.outBuffer.Collect(r.outBuffer.Len())
r.outBuffer.Reset()
if err != nil {
// This should never happen as otherwise we have a broken state
panic(err)
}

// return and reset consumed bytes count
sz = r.byteCount
r.byteCount = 0
return bytes, sz, err

}

// Check last decoded byte really being '\n' also unencoded
Expand Down

0 comments on commit 6aece75

Please sign in to comment.