diff --git a/test/plugin/test_out_file.rb b/test/plugin/test_out_file.rb
index 0e9dd17f98..52a24e0747 100644
--- a/test/plugin/test_out_file.rb
+++ b/test/plugin/test_out_file.rb
@@ -5,6 +5,7 @@
require 'time'
require 'timecop'
require 'zlib'
+require 'zstd-ruby'
require 'fluent/file_wrapper'
class FileOutputTest < Test::Unit::TestCase
@@ -397,20 +398,32 @@ def create_driver(conf = CONFIG, opts = {})
end
end
- def check_gzipped_result(path, expect)
+ def check_zipped_result(path, expect, type: :gzip)
# Zlib::GzipReader has a bug of concatenated file: https://bugs.ruby-lang.org/issues/9790
# Following code from https://www.ruby-forum.com/topic/971591#979520
result = ''
- File.open(path, "rb") { |io|
- loop do
- gzr = Zlib::GzipReader.new(StringIO.new(io.read))
- result << gzr.read
- unused = gzr.unused
- gzr.finish
- break if unused.nil?
- io.pos -= unused.length
- end
- }
+ if type == :gzip || type == :gz
+ File.open(path, "rb") { |io|
+ loop do
+ gzr = Zlib::GzipReader.new(StringIO.new(io.read))
+ result << gzr.read
+ unused = gzr.unused
+ gzr.finish
+ break if unused.nil?
+ io.pos -= unused.length
+ end
+ }
+ elsif type == :zstd
+ File.open(path, "rb") { |io|
+ loop do
+ reader = Zstd::StreamReader.new(StringIO.new(io.read))
+ result << reader.read(1024)
+ break if io.eof?
+ end
+ }
+ else
+ raise "Invalid compression type to check"
+ end
assert_equal expect, result
end
@@ -421,7 +434,7 @@ def check_result(path, expect)
end
sub_test_case 'write' do
- test 'basic case' do
+ test 'basic case with gz' do
d = create_driver
assert_false File.exist?("#{TMP_DIR}/out_file_test.20110102_0.log.gz")
@@ -433,7 +446,29 @@ def check_result(path, expect)
end
assert File.exist?("#{TMP_DIR}/out_file_test.20110102_0.log.gz")
- check_gzipped_result("#{TMP_DIR}/out_file_test.20110102_0.log.gz", %[2011-01-02T13:14:15Z\ttest\t{"a":1}#{@default_newline}] + %[2011-01-02T13:14:15Z\ttest\t{"a":2}#{@default_newline}])
+ check_zipped_result("#{TMP_DIR}/out_file_test.20110102_0.log.gz", %[2011-01-02T13:14:15Z\ttest\t{"a":1}#{@default_newline}] + %[2011-01-02T13:14:15Z\ttest\t{"a":2}#{@default_newline}])
+ end
+
+ test 'write with zstd compression' do
+ d = create_driver %[
+ path #{TMP_DIR}/out_file_test
+ compress zstd
+ utc
+
+ timekey_use_utc true
+
+ ]
+
+ assert_false File.exist?("#{TMP_DIR}/out_file_test.20110102_0.log.zstd")
+
+ time = event_time("2011-01-02 13:14:15 UTC")
+ d.run(default_tag: 'test') do
+ d.feed(time, {"a"=>1})
+ d.feed(time, {"a"=>2})
+ end
+
+ assert File.exist?("#{TMP_DIR}/out_file_test.20110102_0.log.zstd")
+ check_zipped_result("#{TMP_DIR}/out_file_test.20110102_0.log.zstd", %[2011-01-02T13:14:15Z\ttest\t{"a":1}#{@default_newline}] + %[2011-01-02T13:14:15Z\ttest\t{"a":2}#{@default_newline}], type: :zstd)
end
end
@@ -481,7 +516,7 @@ def parse_system(text)
assert File.exist?("#{TMP_DIR_WITH_SYSTEM}/out_file_test.20110102_0.log.gz")
- check_gzipped_result("#{TMP_DIR_WITH_SYSTEM}/out_file_test.20110102_0.log.gz", %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n] + %[2011-01-02T13:14:15Z\ttest\t{"a":2}\n])
+ check_zipped_result("#{TMP_DIR_WITH_SYSTEM}/out_file_test.20110102_0.log.gz", %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n] + %[2011-01-02T13:14:15Z\ttest\t{"a":2}\n])
dir_mode = "%o" % File::stat(TMP_DIR_WITH_SYSTEM).mode
assert_equal(OVERRIDE_DIR_PERMISSION, dir_mode[-3, 3].to_i)
file_mode = "%o" % File::stat("#{TMP_DIR_WITH_SYSTEM}/out_file_test.20110102_0.log.gz").mode
@@ -500,7 +535,7 @@ def parse_system(text)
end
path = d.instance.last_written_path
- check_gzipped_result(path, %[#{Yajl.dump({"a" => 1, 'time' => time.to_i})}#{@default_newline}] + %[#{Yajl.dump({"a" => 2, 'time' => time.to_i})}#{@default_newline}])
+ check_zipped_result(path, %[#{Yajl.dump({"a" => 1, 'time' => time.to_i})}#{@default_newline}] + %[#{Yajl.dump({"a" => 2, 'time' => time.to_i})}#{@default_newline}])
end
test 'ltsv' do
@@ -513,7 +548,7 @@ def parse_system(text)
end
path = d.instance.last_written_path
- check_gzipped_result(path, %[a:1\ttime:2011-01-02T13:14:15Z#{@default_newline}] + %[a:2\ttime:2011-01-02T13:14:15Z#{@default_newline}])
+ check_zipped_result(path, %[a:1\ttime:2011-01-02T13:14:15Z#{@default_newline}] + %[a:2\ttime:2011-01-02T13:14:15Z#{@default_newline}])
end
test 'single_value' do
@@ -526,7 +561,7 @@ def parse_system(text)
end
path = d.instance.last_written_path
- check_gzipped_result(path, %[1#{@default_newline}] + %[2#{@default_newline}])
+ check_zipped_result(path, %[1#{@default_newline}] + %[2#{@default_newline}])
end
end
@@ -547,23 +582,24 @@ def parse_system(text)
path = write_once.call
assert_equal "#{TMP_DIR}/out_file_test.20110102_0.log.gz", path
- check_gzipped_result(path, formatted_lines)
+ check_zipped_result(path, formatted_lines)
assert_equal 1, Dir.glob("#{TMP_DIR}/out_file_test.*").size
path = write_once.call
assert_equal "#{TMP_DIR}/out_file_test.20110102_1.log.gz", path
- check_gzipped_result(path, formatted_lines)
+ check_zipped_result(path, formatted_lines)
assert_equal 2, Dir.glob("#{TMP_DIR}/out_file_test.*").size
path = write_once.call
assert_equal "#{TMP_DIR}/out_file_test.20110102_2.log.gz", path
- check_gzipped_result(path, formatted_lines)
+ check_zipped_result(path, formatted_lines)
assert_equal 3, Dir.glob("#{TMP_DIR}/out_file_test.*").size
end
data(
- "with compression" => true,
- "without compression" => false,
+ "without compression" => "text",
+ "with gzip compression" => "gz",
+ "with zstd compression" => "zstd"
)
test 'append' do |compression|
time = event_time("2011-01-02 13:14:15 UTC")
@@ -578,8 +614,8 @@ def parse_system(text)
timekey_use_utc true
]
- if compression
- config << " compress gz"
+ if compression != :text
+ config << " compress #{compression}"
end
d = create_driver(config)
d.run(default_tag: 'test'){
@@ -590,16 +626,16 @@ def parse_system(text)
}
log_file_name = "out_file_test.20110102.log"
- if compression
- log_file_name << ".gz"
+ if compression != "text"
+ log_file_name << ".#{compression}"
end
1.upto(3) do |i|
path = write_once.call
assert_equal "#{TMP_DIR}/#{log_file_name}", path
expect = formatted_lines * i
- if compression
- check_gzipped_result(path, expect)
+ if compression != "text"
+ check_zipped_result(path, expect, type: compression.to_sym)
else
check_result(path, expect)
end
@@ -630,15 +666,15 @@ def parse_system(text)
path = write_once.call
assert_equal "#{TMP_DIR}/out_file_test.20110102.log.gz", path
- check_gzipped_result(path, formatted_lines)
+ check_zipped_result(path, formatted_lines)
path = write_once.call
assert_equal "#{TMP_DIR}/out_file_test.20110102.log.gz", path
- check_gzipped_result(path, formatted_lines * 2)
+ check_zipped_result(path, formatted_lines * 2)
path = write_once.call
assert_equal "#{TMP_DIR}/out_file_test.20110102.log.gz", path
- check_gzipped_result(path, formatted_lines * 3)
+ check_zipped_result(path, formatted_lines * 3)
end
end
@@ -667,15 +703,15 @@ def parse_system(text)
path = write_once.call
# Rotated at 2011-01-02 17:00:00+02:00
assert_equal "#{TMP_DIR}/out_file_test.20110103.log.gz", path
- check_gzipped_result(path, formatted_lines)
+ check_zipped_result(path, formatted_lines)
path = write_once.call
assert_equal "#{TMP_DIR}/out_file_test.20110103.log.gz", path
- check_gzipped_result(path, formatted_lines * 2)
+ check_zipped_result(path, formatted_lines * 2)
path = write_once.call
assert_equal "#{TMP_DIR}/out_file_test.20110103.log.gz", path
- check_gzipped_result(path, formatted_lines * 3)
+ check_zipped_result(path, formatted_lines * 3)
end
end
@@ -871,6 +907,10 @@ def run_and_check(d, symlink_path)
test 'returns .gz for gzip' do
assert_equal '.gz', @i.compression_suffix(:gzip)
end
+
+ test 'returns .zstd for zstd' do
+ assert_equal '.zstd', @i.compression_suffix(:zstd)
+ end
end
sub_test_case '#generate_path_template' do
diff --git a/test/plugin/test_out_forward.rb b/test/plugin/test_out_forward.rb
index 438caf8fa2..45f7f7b1cc 100644
--- a/test/plugin/test_out_forward.rb
+++ b/test/plugin/test_out_forward.rb
@@ -342,6 +342,15 @@ def try_write(chunk)
assert_equal :gzip, node.instance_variable_get(:@compress)
end
+ test 'set_compress_is_zstd' do
+ @d = d = create_driver(config + %[compress zstd])
+ assert_equal :zstd, d.instance.compress
+ assert_equal :zstd, d.instance.buffer.compress
+
+ node = d.instance.nodes.first
+ assert_equal :zstd, node.instance_variable_get(:@compress)
+ end
+
test 'set_compress_is_gzip_in_buffer_section' do
mock = flexmock($log)
mock.should_receive(:log).with("buffer is compressed. If you also want to save the bandwidth of a network, Add `compress` configuration in ")
@@ -359,6 +368,23 @@ def try_write(chunk)
assert_equal :text, node.instance_variable_get(:@compress)
end
+ test 'set_compress_is_zstd_in_buffer_section' do
+ mock = flexmock($log)
+ mock.should_receive(:log).with("buffer is compressed. If you also want to save the bandwidth of a network, Add `compress` configuration in ")
+
+ @d = d = create_driver(config + %[
+
+ type memory
+ compress zstd
+
+ ])
+ assert_equal :text, d.instance.compress
+ assert_equal :zstd, d.instance.buffer.compress
+
+ node = d.instance.nodes.first
+ assert_equal :text, node.instance_variable_get(:@compress)
+ end
+
test 'phi_failure_detector disabled' do
@d = d = create_driver(config + %[phi_failure_detector false \n phi_threshold 0])
node = d.instance.nodes.first
@@ -549,6 +575,36 @@ def try_write(chunk)
assert_equal ['test', time, records[1]], events[1]
end
+ test 'send_comprssed_message_pack_stream_if_compress_is_zstd' do
+ target_input_driver = create_target_input_driver
+
+ @d = d = create_driver(config + %[
+ flush_interval 1s
+ compress zstd
+ ])
+
+ time = event_time('2011-01-02 13:14:15 UTC')
+
+ records = [
+ {"a" => 1},
+ {"a" => 2}
+ ]
+ target_input_driver.run(expect_records: 2) do
+ d.run(default_tag: 'test') do
+ records.each do |record|
+ d.feed(time, record)
+ end
+ end
+ end
+
+ event_streams = target_input_driver.event_streams
+ assert_true event_streams[0][1].is_a?(Fluent::CompressedMessagePackEventStream)
+
+ events = target_input_driver.events
+ assert_equal ['test', time, records[0]], events[0]
+ assert_equal ['test', time, records[1]], events[1]
+ end
+
test 'send_to_a_node_supporting_responses' do
target_input_driver = create_target_input_driver