Skip to content

Commit

Permalink
fix: 代码review问题处理
Browse files Browse the repository at this point in the history
  • Loading branch information
jayjiahua committed Sep 25, 2023
1 parent 0f7c83d commit fec91a0
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 3 deletions.
7 changes: 5 additions & 2 deletions filebeat/input/log/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@ func (h *Harvester) Run() error {
removedCheckTick = time.After(h.config.ScanFrequency)
}

removedTick := make(<-chan time.Time)

L:
for {
select {
Expand All @@ -217,13 +219,14 @@ func (h *Harvester) Run() error {
if h.reader.fileReader.log.fs.Removed() {
logp.Info("Closing harvester because file was removed: %s, wait for 60s", source)
// 等待 60s 才停止,避免日志没来得及采完就关掉导致漏采
time.Sleep(60 * time.Second)
break L
removedTick = time.After(60 * time.Second)
} else {
logp.Debug("harvester", "File was not removed: %s, check again after %v", source, h.config.ScanFrequency)
// update timer
removedCheckTick = time.After(h.config.ScanFrequency)
}
case <-removedTick:
break L
// Required when reader loop returns and reader finished
case <-h.done:
break L
Expand Down
5 changes: 4 additions & 1 deletion libbeat/reader/readfile/line.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,10 @@ func (r *LineReader) Next() ([]byte, int, error) {
if err != nil {
if r.inBuffer.Len() == 0 || !(err.Error() == "file was removed" || err.Error() == "file inactive") {
// buffer为空,或者不属于文件被删除以及文件不活跃的错误,直接返回空
return nil, 0, err
// return and reset consumed bytes count
sz := r.byteCount
r.byteCount = 0
return nil, sz, err
}

logp.Info("LineReader get an advance err: %s, send all %d bytes in buffer", err, r.inBuffer.Len())
Expand Down

0 comments on commit fec91a0

Please sign in to comment.