Skip to content

Commit 4372698

Browse files
ashiedaipom
andcommitted
in_tail: Manage tail watchers that are rorate_wait state too (#4334)
After a tail watcher transitions to `rotate_wait` state, the `rotate_wait` timer is no longer managed by in_tail, it might cause unexpected behaviour. e.g.) * It's never unwatched when shutdown occurs before `rotate_wait` passed. * Needless `rotate_wait` timers are executed when it detects more rotations. This patch fixes such unexpected behaviour. Note: The comment about `detach_watcher` was added in 76f246a. At that time, closing was done by event-loop. Now, the situation is completely different, so it should be removed. --------- Signed-off-by: Takuro Ashie <[email protected]> Co-authored-by: Daijiro Fukuda <[email protected]>
1 parent 5ee04f0 commit 4372698

File tree

2 files changed

+184
-9
lines changed

2 files changed

+184
-9
lines changed

Diff for: lib/fluent/plugin/in_tail.rb

+18-6
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ def initialize
5252
super
5353
@paths = []
5454
@tails = {}
55+
@tails_rotate_wait = {}
5556
@pf_file = nil
5657
@pf = nil
5758
@ignore_list = []
@@ -267,6 +268,9 @@ def shutdown
267268
@shutdown_start_time = Fluent::Clock.now
268269
# during shutdown phase, don't close io. It should be done in close after all threads are stopped. See close.
269270
stop_watchers(existence_path, immediate: true, remove_watcher: false)
271+
@tails_rotate_wait.keys.each do |tw|
272+
detach_watcher(tw, @tails_rotate_wait[tw][:ino], false)
273+
end
270274
@pf_file.close if @pf_file
271275

272276
super
@@ -275,6 +279,7 @@ def shutdown
275279
def close
276280
super
277281
# close file handles after all threads stopped (in #close of thread plugin helper)
282+
# It may be because we need to wait IOHanlder.ready_to_shutdown()
278283
close_watcher_handles
279284
end
280285

@@ -516,6 +521,9 @@ def close_watcher_handles
516521
tw.close
517522
end
518523
end
524+
@tails_rotate_wait.keys.each do |tw|
525+
tw.close
526+
end
519527
end
520528

521529
# refresh_watchers calls @tails.keys so we don't use stop_watcher -> start_watcher sequence for safety.
@@ -570,10 +578,6 @@ def update_watcher(tail_watcher, pe, new_inode)
570578
detach_watcher_after_rotate_wait(tail_watcher, pe.read_inode)
571579
end
572580

573-
# TailWatcher#close is called by another thread at shutdown phase.
574-
# It causes 'can't modify string; temporarily locked' error in IOHandler
575-
# so adding close_io argument to avoid this problem.
576-
# At shutdown, IOHandler's io will be released automatically after detached the event loop
577581
def detach_watcher(tw, ino, close_io = true)
578582
if @follow_inodes && tw.ino != ino
579583
log.warn("detach_watcher could be detaching an unexpected tail_watcher with a different ino.",
@@ -604,22 +608,30 @@ def detach_watcher_after_rotate_wait(tw, ino)
604608
if @open_on_every_update
605609
# Detach now because it's already closed, waiting it doesn't make sense.
606610
detach_watcher(tw, ino)
607-
elsif throttling_is_enabled?(tw)
611+
end
612+
613+
return if @tails_rotate_wait[tw]
614+
615+
if throttling_is_enabled?(tw)
608616
# When the throttling feature is enabled, it might not reach EOF yet.
609617
# Should ensure to read all contents before closing it, with keeping throttling.
610618
start_time_to_wait = Fluent::Clock.now
611619
timer = timer_execute(:in_tail_close_watcher, 1, repeat: true) do
612620
elapsed = Fluent::Clock.now - start_time_to_wait
613621
if tw.eof? && elapsed >= @rotate_wait
614622
timer.detach
623+
@tails_rotate_wait.delete(tw)
615624
detach_watcher(tw, ino)
616625
end
617626
end
627+
@tails_rotate_wait[tw] = { ino: ino, timer: timer }
618628
else
619629
# when the throttling feature isn't enabled, just wait @rotate_wait
620-
timer_execute(:in_tail_close_watcher, @rotate_wait, repeat: false) do
630+
timer = timer_execute(:in_tail_close_watcher, @rotate_wait, repeat: false) do
631+
@tails_rotate_wait.delete(tw)
621632
detach_watcher(tw, ino)
622633
end
634+
@tails_rotate_wait[tw] = { ino: ino, timer: timer }
623635
end
624636
end
625637

Diff for: test/plugin/test_in_tail.rb

+166-3
Original file line numberDiff line numberDiff line change
@@ -3016,6 +3016,92 @@ def test_path_resurrection
30163016
},
30173017
)
30183018
end
3019+
3020+
def test_next_rotation_occurs_very_fast_while_old_TW_still_waiting_rotate_wait
3021+
config = config_element(
3022+
"ROOT",
3023+
"",
3024+
{
3025+
"path" => "#{@tmp_dir}/tail.txt*",
3026+
"pos_file" => "#{@tmp_dir}/tail.pos",
3027+
"tag" => "t1",
3028+
"format" => "none",
3029+
"read_from_head" => "true",
3030+
"follow_inodes" => "true",
3031+
"rotate_wait" => "3s",
3032+
"refresh_interval" => "1h",
3033+
# stat_watcher often calls `TailWatcher::on_notify` faster than creating a new log file,
3034+
# so disable it in order to reproduce the same condition stably.
3035+
"enable_stat_watcher" => "false",
3036+
}
3037+
)
3038+
d = create_driver(config, false)
3039+
3040+
tail_watchers = []
3041+
stub.proxy(d.instance).setup_watcher do |tw|
3042+
tail_watchers.append(tw)
3043+
mock.proxy(tw).close.once # Note: Currently, there is no harm in duplicate calls.
3044+
tw
3045+
end
3046+
3047+
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "wb") {|f| f.puts "file1 log1"}
3048+
3049+
d.run(expect_records: 6, timeout: 15) do
3050+
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "ab") {|f| f.puts "file1 log2"}
3051+
3052+
sleep 1.5 # Need to be larger than 1s (the interval of watch_timer)
3053+
3054+
FileUtils.move("#{@tmp_dir}/tail.txt0", "#{@tmp_dir}/tail.txt" + "1")
3055+
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "wb") {|f| f.puts "file2 log1"}
3056+
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "ab") {|f| f.puts "file2 log2"}
3057+
3058+
sleep 1.5 # Need to be larger than 1s (the interval of watch_timer)
3059+
3060+
# Rotate again (Old TailWatcher waiting rotate_wait also calls update_watcher)
3061+
[1, 0].each do |i|
3062+
FileUtils.move("#{@tmp_dir}/tail.txt#{i}", "#{@tmp_dir}/tail.txt#{i + 1}")
3063+
end
3064+
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "wb") {|f| f.puts "file3 log1"}
3065+
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "ab") {|f| f.puts "file3 log2"}
3066+
3067+
# Wait rotate_wait to confirm that TailWatcher.close is not called in duplicate.
3068+
# (Note: Currently, there is no harm in duplicate calls)
3069+
sleep 4
3070+
end
3071+
3072+
inode_0 = tail_watchers[0]&.ino
3073+
inode_1 = tail_watchers[1]&.ino
3074+
inode_2 = tail_watchers[2]&.ino
3075+
record_values = d.events.collect { |event| event[2]["message"] }.sort
3076+
position_entries = []
3077+
Fluent::FileWrapper.open("#{@tmp_dir}/tail.pos", "r") do |f|
3078+
f.readlines(chomp: true).each do |line|
3079+
values = line.split("\t")
3080+
position_entries.append([values[0], values[1], values[2].to_i(16)])
3081+
end
3082+
end
3083+
3084+
assert_equal(
3085+
{
3086+
record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2", "file3 log1", "file3 log2"],
3087+
tail_watcher_paths: ["#{@tmp_dir}/tail.txt0", "#{@tmp_dir}/tail.txt0", "#{@tmp_dir}/tail.txt0"],
3088+
tail_watcher_inodes: [inode_0, inode_1, inode_2],
3089+
tail_watcher_io_handler_opened_statuses: [false, false, false],
3090+
position_entries: [
3091+
["#{@tmp_dir}/tail.txt0", "0000000000000016", inode_0],
3092+
["#{@tmp_dir}/tail.txt0", "0000000000000016", inode_1],
3093+
["#{@tmp_dir}/tail.txt0", "0000000000000016", inode_2],
3094+
],
3095+
},
3096+
{
3097+
record_values: record_values,
3098+
tail_watcher_paths: tail_watchers.collect { |tw| tw.path },
3099+
tail_watcher_inodes: tail_watchers.collect { |tw| tw.ino },
3100+
tail_watcher_io_handler_opened_statuses: tail_watchers.collect { |tw| tw.instance_variable_get(:@io_handler)&.opened? || false },
3101+
position_entries: position_entries
3102+
},
3103+
)
3104+
end
30193105
end
30203106

30213107
sub_test_case "Update watchers for rotation without follow_inodes" do
@@ -3084,9 +3170,6 @@ def test_refreshTW_during_rotation
30843170
sleep 3
30853171

30863172
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "ab") {|f| f.puts "file3 log2"}
3087-
3088-
# Wait `rotate_wait` for file2 to make sure to close all IO handlers
3089-
sleep 3
30903173
end
30913174

30923175
inode_0 = tail_watchers[0]&.ino
@@ -3121,5 +3204,85 @@ def test_refreshTW_during_rotation
31213204
},
31223205
)
31233206
end
3207+
3208+
def test_next_rotation_occurs_very_fast_while_old_TW_still_waiting_rotate_wait
3209+
config = config_element(
3210+
"ROOT",
3211+
"",
3212+
{
3213+
"path" => "#{@tmp_dir}/tail.txt0",
3214+
"pos_file" => "#{@tmp_dir}/tail.pos",
3215+
"tag" => "t1",
3216+
"format" => "none",
3217+
"read_from_head" => "true",
3218+
"rotate_wait" => "3s",
3219+
"refresh_interval" => "1h",
3220+
}
3221+
)
3222+
d = create_driver(config, false)
3223+
3224+
tail_watchers = []
3225+
stub.proxy(d.instance).setup_watcher do |tw|
3226+
tail_watchers.append(tw)
3227+
mock.proxy(tw).close.once # Note: Currently, there is no harm in duplicate calls.
3228+
tw
3229+
end
3230+
3231+
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "wb") {|f| f.puts "file1 log1"}
3232+
3233+
d.run(expect_records: 6, timeout: 15) do
3234+
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "ab") {|f| f.puts "file1 log2"}
3235+
3236+
sleep 1.5 # Need to be larger than 1s (the interval of watch_timer)
3237+
3238+
FileUtils.move("#{@tmp_dir}/tail.txt0", "#{@tmp_dir}/tail.txt" + "1")
3239+
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "wb") {|f| f.puts "file2 log1"}
3240+
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "ab") {|f| f.puts "file2 log2"}
3241+
3242+
sleep 1.5 # Need to be larger than 1s (the interval of watch_timer)
3243+
3244+
# Rotate again (Old TailWatcher waiting rotate_wait also calls update_watcher)
3245+
[1, 0].each do |i|
3246+
FileUtils.move("#{@tmp_dir}/tail.txt#{i}", "#{@tmp_dir}/tail.txt#{i + 1}")
3247+
end
3248+
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "wb") {|f| f.puts "file3 log1"}
3249+
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "ab") {|f| f.puts "file3 log2"}
3250+
3251+
# Wait rotate_wait to confirm that TailWatcher.close is not called in duplicate.
3252+
# (Note: Currently, there is no harm in duplicate calls)
3253+
sleep 4
3254+
end
3255+
3256+
inode_0 = tail_watchers[0]&.ino
3257+
inode_1 = tail_watchers[1]&.ino
3258+
inode_2 = tail_watchers[2]&.ino
3259+
record_values = d.events.collect { |event| event[2]["message"] }.sort
3260+
position_entries = []
3261+
Fluent::FileWrapper.open("#{@tmp_dir}/tail.pos", "r") do |f|
3262+
f.readlines(chomp: true).each do |line|
3263+
values = line.split("\t")
3264+
position_entries.append([values[0], values[1], values[2].to_i(16)])
3265+
end
3266+
end
3267+
3268+
assert_equal(
3269+
{
3270+
record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2", "file3 log1", "file3 log2"],
3271+
tail_watcher_paths: ["#{@tmp_dir}/tail.txt0", "#{@tmp_dir}/tail.txt0", "#{@tmp_dir}/tail.txt0"],
3272+
tail_watcher_inodes: [inode_0, inode_1, inode_2],
3273+
tail_watcher_io_handler_opened_statuses: [false, false, false],
3274+
position_entries: [
3275+
["#{@tmp_dir}/tail.txt0", "0000000000000016", inode_2],
3276+
],
3277+
},
3278+
{
3279+
record_values: record_values,
3280+
tail_watcher_paths: tail_watchers.collect { |tw| tw.path },
3281+
tail_watcher_inodes: tail_watchers.collect { |tw| tw.ino },
3282+
tail_watcher_io_handler_opened_statuses: tail_watchers.collect { |tw| tw.instance_variable_get(:@io_handler)&.opened? || false },
3283+
position_entries: position_entries
3284+
},
3285+
)
3286+
end
31243287
end
31253288
end

0 commit comments

Comments
 (0)