Skip to content

Commit

Permalink
compressable:Zstd comp support (#4657)
Browse files Browse the repository at this point in the history
**Which issue(s) this PR fixes**: 
Fixes #4162 

**What this PR does / why we need it**: 
Adds new compression method support to handle messages
**Docs Changes**:
TODO
**Release Note**: 
N/A

---------

Signed-off-by: Athish Pranav D <[email protected]>
Co-authored-by: Daijiro Fukuda <[email protected]>
  • Loading branch information
Athishpranav2003 and daipom authored Jan 21, 2025
1 parent 29189a1 commit 30c3ce0
Show file tree
Hide file tree
Showing 18 changed files with 607 additions and 103 deletions.
1 change: 1 addition & 0 deletions fluentd.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ Gem::Specification.new do |gem|
gem.add_runtime_dependency("tzinfo-data", ["~> 1.0"])
gem.add_runtime_dependency("strptime", [">= 0.2.4", "< 1.0.0"])
gem.add_runtime_dependency("webrick", ["~> 1.4"])
gem.add_runtime_dependency("zstd-ruby", ["~> 1.5"])

# gems that aren't default gems as of Ruby 3.4
gem.add_runtime_dependency("base64", ["~> 0.2"])
Expand Down
11 changes: 6 additions & 5 deletions lib/fluent/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ def to_msgpack_stream(time_int: false, packer: nil)
out.full_pack
end

def to_compressed_msgpack_stream(time_int: false, packer: nil)
def to_compressed_msgpack_stream(time_int: false, packer: nil, type: :gzip)
packed = to_msgpack_stream(time_int: time_int, packer: packer)
compress(packed)
compress(packed, type: type)
end

def to_msgpack_stream_forced_integer(packer: nil)
Expand Down Expand Up @@ -268,10 +268,11 @@ def to_msgpack_stream(time_int: false, packer: nil)
end

class CompressedMessagePackEventStream < MessagePackEventStream
def initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil)
super
def initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil, compress: :gzip)
super(data, cached_unpacker, size, unpacked_times: unpacked_times, unpacked_records: unpacked_records)
@decompressed_data = nil
@compressed_data = data
@type = compress
end

def empty?
Expand Down Expand Up @@ -303,7 +304,7 @@ def to_compressed_msgpack_stream(time_int: false, packer: nil)

def ensure_decompressed!
return if @decompressed_data
@data = @decompressed_data = decompress(@data)
@data = @decompressed_data = decompress(@data, type: @type)
end
end

Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class BufferChunkOverflowError < BufferError; end # A record size is larger than
config_param :queued_chunks_limit_size, :integer, default: nil

desc 'Compress buffered data.'
config_param :compress, :enum, list: [:text, :gzip], default: :text
config_param :compress, :enum, list: [:text, :gzip, :zstd], default: :text

desc 'If true, chunks are thrown away when unrecoverable error happens'
config_param :disable_chunk_backup, :bool, default: false
Expand Down
71 changes: 64 additions & 7 deletions lib/fluent/plugin/buffer/chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,11 @@ def initialize(metadata, compress: :text)
@size = 0
@created_at = Fluent::Clock.real_now
@modified_at = Fluent::Clock.real_now

extend Decompressable if compress == :gzip
if compress == :gzip
extend GzipDecompressable
elsif compress == :zstd
extend ZstdDecompressable
end
end

attr_reader :unique_id, :metadata, :state
Expand All @@ -85,7 +88,7 @@ def modified_at

# data is array of formatted record string
def append(data, **kwargs)
raise ArgumentError, '`compress: gzip` can be used for Compressable module' if kwargs[:compress] == :gzip
raise ArgumentError, "`compress: #{kwargs[:compress]}` can be used for Compressable module" if kwargs[:compress] == :gzip || kwargs[:compress] == :zstd
begin
adding = data.join.force_encoding(Encoding::ASCII_8BIT)
rescue
Expand Down Expand Up @@ -172,23 +175,23 @@ def purge
end

def read(**kwargs)
raise ArgumentError, '`compressed: gzip` can be used for Compressable module' if kwargs[:compressed] == :gzip
raise ArgumentError, "`compressed: #{kwargs[:compressed]}` can be used for Compressable module" if kwargs[:compressed] == :gzip || kwargs[:compressed] == :zstd
raise NotImplementedError, "Implement this method in child class"
end

def open(**kwargs, &block)
raise ArgumentError, '`compressed: gzip` can be used for Compressable module' if kwargs[:compressed] == :gzip
raise ArgumentError, "`compressed: #{kwargs[:compressed]}` can be used for Compressable module" if kwargs[:compressed] == :gzip || kwargs[:compressed] == :zstd
raise NotImplementedError, "Implement this method in child class"
end

def write_to(io, **kwargs)
raise ArgumentError, '`compressed: gzip` can be used for Compressable module' if kwargs[:compressed] == :gzip
raise ArgumentError, "`compressed: #{kwargs[:compressed]}` can be used for Compressable module" if kwargs[:compressed] == :gzip || kwargs[:compressed] == :zstd
open do |i|
IO.copy_stream(i, io)
end
end

module Decompressable
module GzipDecompressable
include Fluent::Plugin::Compressable

def append(data, **kwargs)
Expand Down Expand Up @@ -241,6 +244,60 @@ def write_to(io, **kwargs)
end
end
end

module ZstdDecompressable
include Fluent::Plugin::Compressable

def append(data, **kwargs)
if kwargs[:compress] == :zstd
io = StringIO.new
stream = Zstd::StreamWriter.new(io)
data.each do |d|
stream.write(d)
end
stream.finish
concat(io.string, data.size)
else
super
end
end

def open(**kwargs, &block)
if kwargs[:compressed] == :zstd
super
else
super(**kwargs) do |chunk_io|
output_io = if chunk_io.is_a?(StringIO)
StringIO.new
else
Tempfile.new('decompressed-data')
end
output_io.binmode if output_io.is_a?(Tempfile)
decompress(input_io: chunk_io, output_io: output_io, type: :zstd)
output_io.seek(0, IO::SEEK_SET)
yield output_io
end
end
end

def read(**kwargs)
if kwargs[:compressed] == :zstd
super
else
decompress(super,type: :zstd)
end
end

def write_to(io, **kwargs)
open(compressed: :zstd) do |chunk_io|
if kwargs[:compressed] == :zstd
IO.copy_stream(chunk_io, io)
else
decompress(input_io: chunk_io, output_io: io, type: :zstd)
end
end
end
end
end
end
end
Expand Down
90 changes: 68 additions & 22 deletions lib/fluent/plugin/compressable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,81 +16,127 @@

require 'stringio'
require 'zlib'
require 'zstd-ruby'

module Fluent
module Plugin
module Compressable
def compress(data, **kwargs)
def compress(data, type: :gzip, **kwargs)
output_io = kwargs[:output_io]
io = output_io || StringIO.new
Zlib::GzipWriter.wrap(io) do |gz|
gz.write data
if type == :gzip
writer = Zlib::GzipWriter.new(io)
elsif type == :zstd
writer = Zstd::StreamWriter.new(io)
else
raise ArgumentError, "Unknown compression type: #{type}"
end

writer.write(data)
writer.finish
output_io || io.string
end

# compressed_data is String like `compress(data1) + compress(data2) + ... + compress(dataN)`
# https://www.ruby-forum.com/topic/971591#979503
def decompress(compressed_data = nil, output_io: nil, input_io: nil)
def decompress(compressed_data = nil, output_io: nil, input_io: nil, type: :gzip)
case
when input_io && output_io
io_decompress(input_io, output_io)
io_decompress(input_io, output_io, type)
when input_io
output_io = StringIO.new
io = io_decompress(input_io, output_io)
io = io_decompress(input_io, output_io, type)
io.string
when compressed_data.nil? || compressed_data.empty?
# check compressed_data(String) is 0 length
compressed_data
when output_io
# execute after checking compressed_data is empty or not
io = StringIO.new(compressed_data)
io_decompress(io, output_io)
io_decompress(io, output_io, type)
else
string_decompress(compressed_data)
string_decompress(compressed_data, type)
end
end

private

def string_decompress(compressed_data)
def string_decompress_gzip(compressed_data)
io = StringIO.new(compressed_data)

out = ''
loop do
gz = Zlib::GzipReader.new(io)
out << gz.read
unused = gz.unused
gz.finish

reader = Zlib::GzipReader.new(io)
out << reader.read
unused = reader.unused
reader.finish
unless unused.nil?
adjust = unused.length
io.pos -= adjust
end
break if io.eof?
end
out
end

def string_decompress_zstd(compressed_data)
io = StringIO.new(compressed_data)
out = ''
loop do
reader = Zstd::StreamReader.new(io)
# Zstd::StreamReader needs to specify the size of the buffer
out << reader.read(1024)
# Zstd::StreamReader doesn't provide unused data, so we have to manually adjust the position
break if io.eof?
end
out
end

def io_decompress(input, output)
def string_decompress(compressed_data, type = :gzip)
if type == :gzip
string_decompress_gzip(compressed_data)
elsif type == :zstd
string_decompress_zstd(compressed_data)
else
raise ArgumentError, "Unknown compression type: #{type}"
end
end

def io_decompress_gzip(input, output)
loop do
gz = Zlib::GzipReader.new(input)
v = gz.read
reader = Zlib::GzipReader.new(input)
v = reader.read
output.write(v)
unused = gz.unused
gz.finish

unused = reader.unused
reader.finish
unless unused.nil?
adjust = unused.length
input.pos -= adjust
end
break if input.eof?
end
output
end

def io_decompress_zstd(input, output)
loop do
reader = Zstd::StreamReader.new(input)
# Zstd::StreamReader needs to specify the size of the buffer
v = reader.read(1024)
output.write(v)
# Zstd::StreamReader doesn't provide unused data, so we have to manually adjust the position
break if input.eof?
end
output
end

def io_decompress(input, output, type = :gzip)
if type == :gzip
io_decompress_gzip(input, output)
elsif type == :zstd
io_decompress_zstd(input, output)
else
raise ArgumentError, "Unknown compression type: #{type}"
end
end
end
end
end
12 changes: 8 additions & 4 deletions lib/fluent/plugin/in_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -307,10 +307,14 @@ def on_message(msg, chunk_size, conn)
case entries
when String
# PackedForward
option = msg[2]
size = (option && option['size']) || 0
es_class = (option && option['compressed'] == 'gzip') ? Fluent::CompressedMessagePackEventStream : Fluent::MessagePackEventStream
es = es_class.new(entries, nil, size.to_i)
option = msg[2] || {}
size = option['size'] || 0

if option['compressed'] && option['compressed'] != 'text'
es = Fluent::CompressedMessagePackEventStream.new(entries, nil, size.to_i, compress: option['compressed'].to_sym)
else
es = Fluent::MessagePackEventStream.new(entries, nil, size.to_i)
end
es = check_and_skip_invalid_event(tag, es, conn.remote_host) if @skip_invalid_event
if @enable_field_injection
es = add_source_info(es, conn)
Expand Down
Loading

0 comments on commit 30c3ce0

Please sign in to comment.