From fec91a04103d402c304def8e0f7e758451d4ceaa Mon Sep 17 00:00:00 2001 From: jayjiahua <553544693@qq.com> Date: Mon, 25 Sep 2023 16:19:13 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BB=A3=E7=A0=81review=E9=97=AE?= =?UTF-8?q?=E9=A2=98=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- filebeat/input/log/harvester.go | 7 +++++-- libbeat/reader/readfile/line.go | 5 ++++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/filebeat/input/log/harvester.go b/filebeat/input/log/harvester.go index 4be06815a92e..0126a02c6599 100644 --- a/filebeat/input/log/harvester.go +++ b/filebeat/input/log/harvester.go @@ -204,6 +204,8 @@ func (h *Harvester) Run() error { removedCheckTick = time.After(h.config.ScanFrequency) } + removedTick := make(<-chan time.Time) + L: for { select { @@ -217,13 +219,14 @@ func (h *Harvester) Run() error { if h.reader.fileReader.log.fs.Removed() { logp.Info("Closing harvester because file was removed: %s, wait for 60s", source) // 等待 60s 才停止,避免日志没来得及采完就关掉导致漏采 - time.Sleep(60 * time.Second) - break L + removedTick = time.After(60 * time.Second) } else { logp.Debug("harvester", "File was not removed: %s, check again after %v", source, h.config.ScanFrequency) // update timer removedCheckTick = time.After(h.config.ScanFrequency) } + case <-removedTick: + break L // Required when reader loop returns and reader finished case <-h.done: break L diff --git a/libbeat/reader/readfile/line.go b/libbeat/reader/readfile/line.go index 69de460639c9..928c09db4e1a 100644 --- a/libbeat/reader/readfile/line.go +++ b/libbeat/reader/readfile/line.go @@ -80,7 +80,10 @@ func (r *LineReader) Next() ([]byte, int, error) { if err != nil { if r.inBuffer.Len() == 0 || !(err.Error() == "file was removed" || err.Error() == "file inactive") { // buffer为空,或者不属于文件被删除以及文件不活跃的错误,直接返回空 - return nil, 0, err + // return and reset consumed bytes count + sz := r.byteCount + r.byteCount = 0 + return nil, sz, err } logp.Info("LineReader get an advance err: %s, send all %d bytes in buffer", err, r.inBuffer.Len())