Skip to content

Commit

Permalink
add filter_stdout
Browse files Browse the repository at this point in the history
  • Loading branch information
sonots committed May 1, 2015
1 parent 3a5e749 commit 89230ad
Show file tree
Hide file tree
Showing 4 changed files with 207 additions and 0 deletions.
22 changes: 22 additions & 0 deletions example/filter_stdout.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<source>
type dummy
tag dummy
</source>

<filter **>
type stdout
</filter>

<filter **>
type stdout
output_type hash
</filter>

<filter **>
type stdout
format ltsv
</filter>

<match **>
type null
</match>
27 changes: 27 additions & 0 deletions lib/fluent/formatter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,22 @@ def format(tag, time, record)
end
end

class StdoutFormatter < Formatter
config_param :output_type, :string, :default => 'json'

def configure(conf)
super

@formatter = Plugin.new_formatter(@output_type)
@formatter.configure(conf)
end

def format(tag, time, record)
header = "#{Time.now.localtime} #{tag}: "
"#{header}#{@formatter.format(tag, time, record)}"
end
end

module StructuredFormatMixin
def self.included(klass)
klass.instance_eval {
Expand Down Expand Up @@ -120,6 +136,15 @@ def format_record(record)
end
end

class HashFormatter < Formatter
include HandleTagAndTimeMixin
include StructuredFormatMixin

def format_record(record)
"#{record.to_s}\n"
end
end

class MessagePackFormatter < Formatter
include HandleTagAndTimeMixin
include StructuredFormatMixin
Expand Down Expand Up @@ -203,7 +228,9 @@ def format(tag, time, record)
TEMPLATE_REGISTRY = Registry.new(:formatter_type, 'fluent/plugin/formatter_')
{
'out_file' => Proc.new { OutFileFormatter.new },
'stdout' => Proc.new { StdoutFormatter.new },
'json' => Proc.new { JSONFormatter.new },
'hash' => Proc.new { HashFormatter.new },
'msgpack' => Proc.new { MessagePackFormatter.new },
'ltsv' => Proc.new { LabeledTSVFormatter.new },
'csv' => Proc.new { CsvFormatter.new },
Expand Down
46 changes: 46 additions & 0 deletions lib/fluent/plugin/filter_stdout.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

module Fluent
class StdoutFilter < Filter
Plugin.register_filter('stdout', self)

# for tests
attr_reader :formatter

config_param :format, :string, :default => 'stdout'
# config_param :output_type, :string, :default => 'json' (StdoutFormatter defines this)

def configure(conf)
super

@formatter = Plugin.new_formatter(@format)
@formatter.configure(conf)
end

def filter_stream(tag, es)
es.each { |time, record|
begin
log.write @formatter.format(tag, time, record)
rescue => e
router.emit_error_event(tag, time, record, e)
end
}
log.flush
es
end
end
end
112 changes: 112 additions & 0 deletions test/plugin/test_filter_stdout.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
require_relative '../helper'
require 'fluent/plugin/filter_stdout'
require 'timecop'
require 'flexmock'

class StdoutFilterTest < Test::Unit::TestCase
include Fluent
include FlexMock::TestCase

def setup
Fluent::Test.setup
Timecop.freeze
end

def teardown
Timecop.return
end

CONFIG = %[
]

def create_driver(conf = CONFIG)
Test::FilterTestDriver.new(StdoutFilter, 'filter.test').configure(conf)
end

def emit(d, msg, time)
d.run {
d.emit(msg, time)
}.filtered_as_array[0][2]
end

def test_through_record
d = create_driver
time = Time.now
filtered = emit(d, {'test' => 'test'}, time)
assert_equal({'test' => 'test'}, filtered)
end

def test_configure_default
d = create_driver
assert_equal 'json', d.instance.formatter.output_type
end

def test_configure_output_type
d = create_driver(CONFIG + "\noutput_type json")
assert_equal 'json', d.instance.formatter.output_type

d = create_driver(CONFIG + "\noutput_type hash")
assert_equal 'hash', d.instance.formatter.output_type

d = create_driver(CONFIG + "\noutput_type ltsv")
assert_equal 'ltsv', d.instance.formatter.output_type

assert_raise(Fluent::ConfigError) do
d = create_driver(CONFIG + "\noutput_type foo")
end
end

def test_output_type_json
d = create_driver(CONFIG + "\noutput_type json")
time = Time.now
out = capture_log(d) { emit(d, {'test' => 'test'}, time) }
assert_equal "#{time.localtime} filter.test: {\"test\":\"test\"}\n", out

# NOTE: Float::NAN is not jsonable
d = create_driver(CONFIG + "\noutput_type json")
flexmock(d.instance.router).should_receive(:emit_error_event)
emit(d, {'test' => Float::NAN}, time)
end

def test_output_type_hash
d = create_driver(CONFIG + "\noutput_type hash")
time = Time.now
out = capture_log(d) { emit(d, {'test' => 'test'}, time) }
assert_equal "#{time.localtime} filter.test: {\"test\"=>\"test\"}\n", out

# NOTE: Float::NAN is not jsonable, but hash string can output it.
d = create_driver(CONFIG + "\noutput_type hash")
out = capture_log(d) { emit(d, {'test' => Float::NAN}, time) }
assert_equal "#{time.localtime} filter.test: {\"test\"=>NaN}\n", out
end

# Use include_time_key to output the message's time
def test_include_time_key
d = create_driver(CONFIG + "\noutput_type json\ninclude_time_key true\nutc")
time = Time.now
message_time = Time.parse("2011-01-02 13:14:15 UTC").to_i
out = capture_log(d) { emit(d, {'test' => 'test'}, message_time) }
assert_equal "#{time.localtime} filter.test: {\"test\":\"test\",\"time\":\"2011-01-02T13:14:15Z\"}\n", out
end

# out_stdout formatter itself can also be replaced
def test_format_json
d = create_driver(CONFIG + "\nformat json")
time = Time.now
out = capture_log(d) { emit(d, {'test' => 'test'}, time) }
assert_equal "{\"test\":\"test\"}\n", out
end

private

# Capture the log output of the block given
def capture_log(d, &block)
tmp = d.instance.log
d.instance.log = StringIO.new
yield
return d.instance.log.string
ensure
d.instance.log = tmp
end
end

0 comments on commit 89230ad

Please sign in to comment.