Skip to content

Commit

Permalink
in_tail: Fix a stall bug on !follow_inode case (#4327)
Browse files Browse the repository at this point in the history
Fix #3614

Although known stall issues of in_tail on `follow_inode` case are fixed
in v1.16.2, it has still a similar problem on `!follow_inode` case.

In this case, a tail watcher is possible to mark the position entry as
`unwatched` if it's tansitioned to `rotate_wait` state by
`refresh_watcher` even if another newer tail watcher is managing it.
It's hard to occur in usual because `stat_watcher` will be called
immediately after the file is changed  while `refresh_wather` is called
every 60 seconds by default. However, there is a rare possibility that
this order might be swapped especillay if in_tail is busy on processing
large amount of logs. Because in_tail is single threadied, event queues
such as timers or inotify will be stucked in this case.

There is no such problem on `follow_inode` case because position entries
are always marked as `unwatched` before entering `rotate_wait` state.

---------

Signed-off-by: Takuro Ashie <[email protected]>
Co-authored-by: Daijiro Fukuda <[email protected]>
  • Loading branch information
ashie and daipom authored Oct 27, 2023
1 parent dd1a6e5 commit 52e46f0
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 1 deletion.
2 changes: 1 addition & 1 deletion lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ def detach_watcher(tw, ino, close_io = true)

tw.close if close_io

if tw.unwatched && @pf
if @pf && tw.unwatched && (@follow_inode || !@tails[tw.path])
target_info = TargetInfo.new(tw.path, ino)
@pf.unwatch(target_info)
end
Expand Down
105 changes: 105 additions & 0 deletions test/plugin/test_in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3017,4 +3017,109 @@ def test_path_resurrection
)
end
end

sub_test_case "Update watchers for rotation without follow_inodes" do
# The scenario where in_tail wrongly unwatches the PositionEntry.
# This is reported in https://github.com/fluent/fluentd/issues/3614.
def test_refreshTW_during_rotation
config = config_element(
"ROOT",
"",
{
"path" => "#{@tmp_dir}/tail.txt0",
"pos_file" => "#{@tmp_dir}/tail.pos",
"tag" => "t1",
"format" => "none",
"read_from_head" => "true",
# In order to detach the old watcher quickly.
"rotate_wait" => "3s",
# In order to reproduce the same condition stably, ensure that `refresh_watchers` is not
# called by a timer.
"refresh_interval" => "1h",
# stat_watcher often calls `TailWatcher::on_notify` faster than creating a new log file,
# so disable it in order to reproduce the same condition stably.
"enable_stat_watcher" => "false",
}
)
d = create_driver(config, false)

tail_watchers = []
stub.proxy(d.instance).setup_watcher do |tw|
tail_watchers.append(tw)
tw
end

Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "wb") {|f| f.puts "file1 log1"}

d.run(expect_records: 6, timeout: 15) do
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "ab") {|f| f.puts "file1 log2"}
FileUtils.move("#{@tmp_dir}/tail.txt0", "#{@tmp_dir}/tail.txt" + "1")

# This reproduces the following situation:
# `refresh_watchers` is called during the rotation process and it detects the current file being lost.
# Then it stops and unwatches the TailWatcher.
d.instance.refresh_watchers

Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "wb") {|f| f.puts "file2 log1"}

# `watch_timer` calls `TailWatcher::on_notify`, and then `update_watcher` trys to add the new TailWatcher.
# After `rotate_wait` interval, the PositionEntry is unwatched.
# HOWEVER, the new TailWatcher is still using that PositionEntry, so this breaks the PositionFile!!
# That PositionEntry is removed from `PositionFile::map`, but it is still working and remaining in the real pos file.
sleep 5

# Append to the new current log file.
# The PositionEntry is updated although it does not exist in `PositionFile::map`.
# `PositionFile::map`: empty
# Real pos file: `.../tail.txt 0000000000000016 (inode)`
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "ab") {|f| f.puts "file2 log2"}

# Rotate again
[1, 0].each do |i|
FileUtils.move("#{@tmp_dir}/tail.txt#{i}", "#{@tmp_dir}/tail.txt#{i + 1}")
end
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "wb") {|f| f.puts "file3 log1"}

# `watch_timer` calls `TailWatcher::on_notify`, and then `update_watcher` trys to update the TailWatcher.
sleep 3

Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "ab") {|f| f.puts "file3 log2"}

# Wait `rotate_wait` for file2 to make sure to close all IO handlers
sleep 3
end

inode_0 = tail_watchers[0]&.ino
inode_1 = tail_watchers[1]&.ino
inode_2 = tail_watchers[2]&.ino
record_values = d.events.collect { |event| event[2]["message"] }.sort
position_entries = []
Fluent::FileWrapper.open("#{@tmp_dir}/tail.pos", "r") do |f|
f.readlines(chomp: true).each do |line|
values = line.split("\t")
position_entries.append([values[0], values[1], values[2].to_i(16)])
end
end

assert_equal(
{
record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2", "file3 log1", "file3 log2"],
tail_watcher_paths: ["#{@tmp_dir}/tail.txt0", "#{@tmp_dir}/tail.txt0", "#{@tmp_dir}/tail.txt0"],
tail_watcher_inodes: [inode_0, inode_1, inode_2],
tail_watcher_io_handler_opened_statuses: [false, false, false],
position_entries: [
# The recorded path is old, but it is no problem. The path is not used when using follow_inodes.
["#{@tmp_dir}/tail.txt0", "0000000000000016", inode_2],
],
},
{
record_values: record_values,
tail_watcher_paths: tail_watchers.collect { |tw| tw.path },
tail_watcher_inodes: tail_watchers.collect { |tw| tw.ino },
tail_watcher_io_handler_opened_statuses: tail_watchers.collect { |tw| tw.instance_variable_get(:@io_handler)&.opened? || false },
position_entries: position_entries
},
)
end
end
end

0 comments on commit 52e46f0

Please sign in to comment.