Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolve Credentials issues and queue URL issues #44

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions README.rdoc
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,23 @@ Read events from Amazon SQS.

type sqs

# url as https://sqs.us-west-2.amazonaws.com/123456789012/myqueue
sqs_url {queue_url}

# following attribute is required if you don't declare a sqs_url
queue_name {queue_instance_key}
# following attribute is required
queue_name {queue_name}

# following attributes are required if you don't use IAM Role nor export credentials to ENV

aws_key_id {your_aws_key_id}
aws_sec_key {your_aws_secret_key}

# following attributes are required if you use FIFO queue

message_group_id {string}

# following attributes are optional

create_queue {boolean}
region {your_region}

# following attributes are required if you use FIFO queue

message_group_id {string}
region {your_region}

### region list ###
# Asia Pacific (Tokyo) [Default] : ap-northeast-1
Expand All @@ -53,6 +51,7 @@ Read events from Amazon SQS.
delay_seconds {delivery_delay_seconds}

include_tag {boolean}

tag_property_name {tag's property name in json}


Expand All @@ -66,7 +65,7 @@ Read events from Amazon SQS.

# following attribute is required

sqs_url {queue_url}
queue_name {queue_name}

# following attributes are required if you don't use IAM Role nor export credentials to ENV

Expand Down
21 changes: 12 additions & 9 deletions lib/fluent/plugin/in_sqs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ class SQSInput < Input
config_param :aws_sec_key, :string, default: nil, secret: true
config_param :tag, :string
config_param :region, :string, default: 'ap-northeast-1'
config_param :sqs_url, :string, default: nil
config_param :queue_name, :string, default: nil
config_param :queue_owner_aws_account_id, :string, default: nil
config_param :receive_interval, :time, default: 0.1
config_param :max_number_of_messages, :integer, default: 10
config_param :wait_time_seconds, :integer, default: 10
Expand All @@ -21,12 +22,6 @@ class SQSInput < Input

def configure(conf)
super

Aws.config = {
access_key_id: @aws_key_id,
secret_access_key: @aws_sec_key,
region: @region
}
end

def start
Expand All @@ -36,11 +31,19 @@ def start
end

def client
@client ||= Aws::SQS::Client.new(stub_responses: @stub_responses)
@client ||= Aws::SQS::Client.new(
access_key_id: @aws_key_id,
secret_access_key: @aws_sec_key,
region: @region,
stub_responses: @stub_responses
)
end

def queue
@queue ||= Aws::SQS::Resource.new(client: client).queue(@sqs_url)
@queue ||= Aws::SQS::Queue.new(
url: client.get_queue_url(queue_name: @queue_name, queue_owner_aws_account_id: @queue_owner_aws_account_id).queue_url,
client: client
)
end

def shutdown
Expand Down
42 changes: 20 additions & 22 deletions lib/fluent/plugin/out_sqs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ class SQSOutput < Output
config_param :aws_key_id, :string, default: nil, secret: true
config_param :aws_sec_key, :string, default: nil, secret: true
config_param :queue_name, :string, default: nil
config_param :sqs_url, :string, default: nil
config_param :create_queue, :bool, default: true
config_param :region, :string, default: 'ap-northeast-1'
config_param :delay_seconds, :integer, default: 0
Expand All @@ -34,38 +33,37 @@ def configure(conf)
compat_parameters_convert(conf, :buffer, :inject)
super

if (!@queue_name.nil? && @queue_name.end_with?('.fifo')) || (!@sqs_url.nil? && @sqs_url.end_with?('.fifo'))
if (!@queue_name.nil? && @queue_name.end_with?('.fifo'))
raise Fluent::ConfigError, 'message_group_id parameter is required for FIFO queue' if @message_group_id.nil?
end

Aws.config = {
access_key_id: @aws_key_id,
secret_access_key: @aws_sec_key,
region: @region
}
end

def client
@client ||= Aws::SQS::Client.new
end

def resource
@resource ||= Aws::SQS::Resource.new(client: client)
@client ||= Aws::SQS::Client.new(
access_key_id: @aws_key_id,
secret_access_key: @aws_sec_key,
region: @region,
stub_responses: @stub_responses
)
end

def queue
return @queue if @queue

@queue = if @create_queue && @queue_name
resource.create_queue(queue_name: @queue_name)
else
@queue = if @sqs_url
resource.queue(@sqs_url)
else
resource.get_queue_by_name(queue_name: @queue_name)
end
end
begin
sqs_url = client.get_queue_url(queue_name: @queue_name, queue_owner_aws_account_id: @queue_owner_aws_account_id).queue_url
rescue Aws::SQS::Errors::NonExistentQueue => e
if @create_queue
sqs_url = client.create_queue(queue_name: @queue_name).queue_url
else
raise
end
end

@queue = Aws::SQS::Queue.new(
url: sqs_url,
client: client
)
@queue
end

Expand Down