From ecd7acf06bec3b3e85307ea74ed703ffbaf91543 Mon Sep 17 00:00:00 2001 From: jayjiahua <553544693@qq.com> Date: Tue, 19 Sep 2023 20:34:24 +0800 Subject: [PATCH 1/4] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=E5=BD=93=E6=96=87?= =?UTF-8?q?=E4=BB=B6=E6=9C=80=E5=90=8E=E4=B8=80=E8=A1=8C=E6=97=A5=E5=BF=97?= =?UTF-8?q?=E6=B2=A1=E6=9C=89=E6=8D=A2=E8=A1=8C=E7=AC=A6=E4=BC=9A=E5=AF=BC?= =?UTF-8?q?=E8=87=B4=E5=8F=A5=E6=9F=84=E4=B8=8D=E9=87=8A=E6=94=BE=E7=9A=84?= =?UTF-8?q?=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- filebeat/input/log/harvester.go | 6 ++++- libbeat/reader/readfile/line.go | 41 +++++++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 1 deletion(-) diff --git a/filebeat/input/log/harvester.go b/filebeat/input/log/harvester.go index 10b9fad16347..4be06815a92e 100644 --- a/filebeat/input/log/harvester.go +++ b/filebeat/input/log/harvester.go @@ -215,7 +215,9 @@ func (h *Harvester) Run() error { case <-removedCheckTick: // 通过旁路判断文件是否被删除,避免输出堵塞时未能执行 errorChecks 导致已删文件的 fd 没有被及时释放的问题 if h.reader.fileReader.log.fs.Removed() { - logp.Info("Closing harvester because file was removed: %s", source) + logp.Info("Closing harvester because file was removed: %s, wait for 60s", source) + // 等待 60s 才停止,避免日志没来得及采完就关掉导致漏采 + time.Sleep(60 * time.Second) break L } else { logp.Debug("harvester", "File was not removed: %s, check again after %v", source, h.config.ScanFrequency) @@ -232,6 +234,8 @@ func (h *Harvester) Run() error { // Close reader h.reader.Stop() + + logp.Info("Harvester stopped: %s", source) }(h.state.Source) logp.Info("Harvester started for file: %s, offset: %d", h.state.Source, h.state.Offset) diff --git a/libbeat/reader/readfile/line.go b/libbeat/reader/readfile/line.go index 843333920280..6629f9eb4f1e 100644 --- a/libbeat/reader/readfile/line.go +++ b/libbeat/reader/readfile/line.go @@ -78,6 +78,47 @@ func (r *LineReader) Next() ([]byte, int, error) { // read next 'potential' line from input buffer/reader err := r.advance() if err != nil { + if r.inBuffer.Len() > 0 { + // Found EOF and collectOnEOF is true + // -> decode input sequence into outBuffer + // let's take whole buffer len without len(nl) if it ends with it + end := r.inBuffer.Len() + if bytes.HasSuffix(r.inBuffer.Bytes(), r.nl) { + end -= len(r.nl) + } + + sz, err := r.decode(end) + if err != nil { + logp.Err("Error decoding line: %s", err) + // In case of error increase size by unencoded length + sz = r.inBuffer.Len() + } + + // Consume transformed bytes from input buffer + _ = r.inBuffer.Advance(sz) + r.inBuffer.Reset() + + // continue scanning input buffer from last position + 1 + r.inOffset = end - sz + if r.inOffset < 0 { + // fix inOffset if '\n' has encoding > 8bits + firl line has been decoded + r.inOffset = 0 + } + // output buffer contains untile EOF. Extract + // byte slice from buffer and reset output buffer. + bytes, err := r.outBuffer.Collect(r.outBuffer.Len()) + r.outBuffer.Reset() + if err != nil { + // This should never happen as otherwise we have a broken state + panic(err) + } + + // return and reset consumed bytes count + sz = r.byteCount + r.byteCount = 0 + return bytes, sz, err + } + return nil, 0, err } From e4e084ba9d7de82a5ee1fdaca71c065dfa14ea99 Mon Sep 17 00:00:00 2001 From: jayjiahua <553544693@qq.com> Date: Wed, 20 Sep 2023 21:37:12 +0800 Subject: [PATCH 2/4] =?UTF-8?q?chore:=20=E8=B0=83=E6=95=B4=E4=BB=A3?= =?UTF-8?q?=E7=A0=81=E7=BB=93=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- libbeat/reader/readfile/line.go | 74 +++++++++++++++++---------------- 1 file changed, 38 insertions(+), 36 deletions(-) diff --git a/libbeat/reader/readfile/line.go b/libbeat/reader/readfile/line.go index 6629f9eb4f1e..d904375fb3a3 100644 --- a/libbeat/reader/readfile/line.go +++ b/libbeat/reader/readfile/line.go @@ -78,48 +78,50 @@ func (r *LineReader) Next() ([]byte, int, error) { // read next 'potential' line from input buffer/reader err := r.advance() if err != nil { - if r.inBuffer.Len() > 0 { - // Found EOF and collectOnEOF is true - // -> decode input sequence into outBuffer - // let's take whole buffer len without len(nl) if it ends with it - end := r.inBuffer.Len() - if bytes.HasSuffix(r.inBuffer.Bytes(), r.nl) { - end -= len(r.nl) - } + if r.inBuffer.Len() == 0 { + return nil, 0, err + } - sz, err := r.decode(end) - if err != nil { - logp.Err("Error decoding line: %s", err) - // In case of error increase size by unencoded length - sz = r.inBuffer.Len() - } + logp.Info("LineReader get an advance err: %s, send all %d bytes in buffer", err, r.inBuffer.Len()) + // Found EOF and collectOnEOF is true + // -> decode input sequence into outBuffer + // let's take whole buffer len without len(nl) if it ends with it + end := r.inBuffer.Len() + if bytes.HasSuffix(r.inBuffer.Bytes(), r.nl) { + end -= len(r.nl) + } - // Consume transformed bytes from input buffer - _ = r.inBuffer.Advance(sz) - r.inBuffer.Reset() + sz, err := r.decode(end) + if err != nil { + logp.Err("Error decoding line: %s", err) + // In case of error increase size by unencoded length + sz = r.inBuffer.Len() + } - // continue scanning input buffer from last position + 1 - r.inOffset = end - sz - if r.inOffset < 0 { - // fix inOffset if '\n' has encoding > 8bits + firl line has been decoded - r.inOffset = 0 - } - // output buffer contains untile EOF. Extract - // byte slice from buffer and reset output buffer. - bytes, err := r.outBuffer.Collect(r.outBuffer.Len()) - r.outBuffer.Reset() - if err != nil { - // This should never happen as otherwise we have a broken state - panic(err) - } + // Consume transformed bytes from input buffer + _ = r.inBuffer.Advance(sz) + r.inBuffer.Reset() - // return and reset consumed bytes count - sz = r.byteCount - r.byteCount = 0 - return bytes, sz, err + // continue scanning input buffer from last position + 1 + r.inOffset = end - sz + if r.inOffset < 0 { + // fix inOffset if '\n' has encoding > 8bits + firl line has been decoded + r.inOffset = 0 + } + // output buffer contains untile EOF. Extract + // byte slice from buffer and reset output buffer. + bytes, err := r.outBuffer.Collect(r.outBuffer.Len()) + r.outBuffer.Reset() + if err != nil { + // This should never happen as otherwise we have a broken state + panic(err) } - return nil, 0, err + // return and reset consumed bytes count + sz = r.byteCount + r.byteCount = 0 + return bytes, sz, err + } // Check last decoded byte really being '\n' also unencoded From 0f7c83d647507b3e843e2e59495ee11b70e98a61 Mon Sep 17 00:00:00 2001 From: jayjiahua <553544693@qq.com> Date: Sun, 24 Sep 2023 21:28:41 +0800 Subject: [PATCH 3/4] =?UTF-8?q?feat:=20=E4=BC=98=E5=8C=96=E9=94=99?= =?UTF-8?q?=E8=AF=AF=E5=88=A4=E6=96=AD=E6=9D=A1=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- libbeat/reader/readfile/line.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/libbeat/reader/readfile/line.go b/libbeat/reader/readfile/line.go index d904375fb3a3..69de460639c9 100644 --- a/libbeat/reader/readfile/line.go +++ b/libbeat/reader/readfile/line.go @@ -78,7 +78,8 @@ func (r *LineReader) Next() ([]byte, int, error) { // read next 'potential' line from input buffer/reader err := r.advance() if err != nil { - if r.inBuffer.Len() == 0 { + if r.inBuffer.Len() == 0 || !(err.Error() == "file was removed" || err.Error() == "file inactive") { + // buffer为空,或者不属于文件被删除以及文件不活跃的错误,直接返回空 return nil, 0, err } From fec91a04103d402c304def8e0f7e758451d4ceaa Mon Sep 17 00:00:00 2001 From: jayjiahua <553544693@qq.com> Date: Mon, 25 Sep 2023 16:19:13 +0800 Subject: [PATCH 4/4] =?UTF-8?q?fix:=20=E4=BB=A3=E7=A0=81review=E9=97=AE?= =?UTF-8?q?=E9=A2=98=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- filebeat/input/log/harvester.go | 7 +++++-- libbeat/reader/readfile/line.go | 5 ++++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/filebeat/input/log/harvester.go b/filebeat/input/log/harvester.go index 4be06815a92e..0126a02c6599 100644 --- a/filebeat/input/log/harvester.go +++ b/filebeat/input/log/harvester.go @@ -204,6 +204,8 @@ func (h *Harvester) Run() error { removedCheckTick = time.After(h.config.ScanFrequency) } + removedTick := make(<-chan time.Time) + L: for { select { @@ -217,13 +219,14 @@ func (h *Harvester) Run() error { if h.reader.fileReader.log.fs.Removed() { logp.Info("Closing harvester because file was removed: %s, wait for 60s", source) // 等待 60s 才停止,避免日志没来得及采完就关掉导致漏采 - time.Sleep(60 * time.Second) - break L + removedTick = time.After(60 * time.Second) } else { logp.Debug("harvester", "File was not removed: %s, check again after %v", source, h.config.ScanFrequency) // update timer removedCheckTick = time.After(h.config.ScanFrequency) } + case <-removedTick: + break L // Required when reader loop returns and reader finished case <-h.done: break L diff --git a/libbeat/reader/readfile/line.go b/libbeat/reader/readfile/line.go index 69de460639c9..928c09db4e1a 100644 --- a/libbeat/reader/readfile/line.go +++ b/libbeat/reader/readfile/line.go @@ -80,7 +80,10 @@ func (r *LineReader) Next() ([]byte, int, error) { if err != nil { if r.inBuffer.Len() == 0 || !(err.Error() == "file was removed" || err.Error() == "file inactive") { // buffer为空,或者不属于文件被删除以及文件不活跃的错误,直接返回空 - return nil, 0, err + // return and reset consumed bytes count + sz := r.byteCount + r.byteCount = 0 + return nil, sz, err } logp.Info("LineReader get an advance err: %s, send all %d bytes in buffer", err, r.inBuffer.Len())