Skip to content

Commit

Permalink
Merge pull request fluent#473 from edsiper/HTTP_improvements
Browse files Browse the repository at this point in the history
in_http improvements
  • Loading branch information
repeatedly committed Nov 19, 2014
2 parents 95354c3 + a839c20 commit 2f8f423
Showing 1 changed file with 27 additions and 6 deletions.
33 changes: 27 additions & 6 deletions lib/fluent/plugin/in_http.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ def initialize
config_param :keepalive_timeout, :time, :default => 10 # TODO default
config_param :backlog, :integer, :default => nil
config_param :add_http_headers, :bool, :default => false
config_param :add_remote_addr, :bool, :default => false
config_param :format, :string, :default => 'default'
config_param :blocking_timeout, :time, :default => 0.5
config_param :cors_allow_origins, :array, :default => nil

def configure(conf)
super
Expand Down Expand Up @@ -85,7 +87,9 @@ def start
super
@km = KeepaliveManager.new(@keepalive_timeout)
#@lsock = Coolio::TCPServer.new(@bind, @port, Handler, @km, method(:on_request), @body_size_limit)
@lsock = Coolio::TCPServer.new(lsock, nil, Handler, @km, method(:on_request), @body_size_limit, @format, log)
@lsock = Coolio::TCPServer.new(lsock, nil, Handler, @km, method(:on_request),
@body_size_limit, @format, log,
@cors_allow_origins)
@lsock.listen(@backlog) unless @backlog.nil?

@loop = Coolio::Loop.new
Expand Down Expand Up @@ -129,6 +133,10 @@ def on_request(path_info, params)
}
end

if @add_remote_addr
record['REMOTE_ADDR'] = params['REMOTE_ADDR']
end

time = if param_time = params['time']
param_time = param_time.to_i
param_time.zero? ? Engine.now : param_time
Expand All @@ -142,10 +150,10 @@ def on_request(path_info, params)
# TODO server error
begin
# Support batched requests
if record.is_a?(Array)
if record.is_a?(Array)
mes = MultiEventStream.new
record.each do |single_record|
single_time = single_record.delete("time") || time
single_time = single_record.delete("time") || time
mes.add(single_time, single_record)
end
router.emit_stream(tag, mes)
Expand Down Expand Up @@ -185,15 +193,15 @@ def parse_params_with_parser(params)
end

class Handler < Coolio::Socket
def initialize(io, km, callback, body_size_limit, format, log)
def initialize(io, km, callback, body_size_limit, format, log, cors_allow_origins)
super(io)
@km = km
@callback = callback
@body_size_limit = body_size_limit
@next_close = false
@format = format
@log = log

@cors_allow_origins = cors_allow_origins
@idle = 0
@km.add(self)

Expand Down Expand Up @@ -228,6 +236,7 @@ def on_message_begin
def on_headers_complete(headers)
expect = nil
size = nil

if @parser.http_version == [1, 1]
@keep_alive = true
else
Expand All @@ -250,6 +259,8 @@ def on_headers_complete(headers)
elsif v =~ /Keep-alive/i
@keep_alive = true
end
when /Origin/i
@origin = v
end
}
if expect
Expand Down Expand Up @@ -278,6 +289,17 @@ def on_body(chunk)
def on_message_complete
return if closing?

# CORS check
# ==========
# For every incoming request, we check if we have some CORS
# restrictions and white listed origins through @cors_allow_origins.
unless @cors_allow_origins.nil?
unless @cors_allow_origins.include?(@origin)
send_response_and_close("403 Forbidden", {'Connection' => 'close'}, "")
return
end
end

@env['REMOTE_ADDR'] = @remote_addr if @remote_addr

uri = URI.parse(@parser.request_url)
Expand Down Expand Up @@ -347,4 +369,3 @@ def send_response_nobody(code, header)
end
end
end

0 comments on commit 2f8f423

Please sign in to comment.