Skip to content

Commit

Permalink
Add options for making consumer exclusive
Browse files Browse the repository at this point in the history
  • Loading branch information
anderslemke committed Mar 19, 2019
1 parent 24abf5f commit a09520f
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 6 deletions.
2 changes: 1 addition & 1 deletion lib/sneakers/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def subscribe(worker)
# retry queues, etc).
handler = handler_klass.new(@channel, queue, worker.opts)

@consumer = queue.subscribe(block: false, manual_ack: @opts[:ack]) do | delivery_info, metadata, msg |
@consumer = queue.subscribe(block: false, manual_ack: @opts[:ack], exclusive: @opts[:exclusive]) do | delivery_info, metadata, msg |
worker.do_work(delivery_info, metadata, msg, handler)
end
nil
Expand Down
22 changes: 17 additions & 5 deletions spec/sneakers/queue_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
{
:prefetch => 25,
:ack => true,
:exclusive => false,
:heartbeat => 2,
:vhost => '/',
:exchange => "sneakers",
Expand Down Expand Up @@ -57,7 +58,18 @@
q = Sneakers::Queue.new("downloads", queue_vars)

mock(@mkqueue).bind(@mkex, :routing_key => "downloads")
mock(@mkqueue).subscribe(:block => false, :manual_ack => true)
mock(@mkqueue).subscribe(:block => false, :manual_ack => true, exclusive: false)

q.subscribe(@mkworker)
end

it "supports exclusive consumers" do
mock(@mkchan).queue("downloads", :durable => true) { @mkqueue }
q = Sneakers::Queue.new("downloads",
queue_vars.merge(:exclusive => true))

mock(@mkqueue).bind(@mkex, :routing_key => "downloads")
mock(@mkqueue).subscribe(:block => false, :manual_ack => true, exclusive: true)

q.subscribe(@mkworker)
end
Expand All @@ -69,7 +81,7 @@

mock(@mkqueue).bind(@mkex, :routing_key => "alpha")
mock(@mkqueue).bind(@mkex, :routing_key => "beta")
mock(@mkqueue).subscribe(:block => false, :manual_ack => true)
mock(@mkqueue).subscribe(:block => false, :manual_ack => true, exclusive: false)

q.subscribe(@mkworker)
end
Expand All @@ -80,7 +92,7 @@
queue_vars.merge(:bind_arguments => { "os" => "linux", "cores" => 8 }))

mock(@mkqueue).bind(@mkex, :routing_key => "downloads", :arguments => { "os" => "linux", "cores" => 8 })
mock(@mkqueue).subscribe(:block => false, :manual_ack => true)
mock(@mkqueue).subscribe(:block => false, :manual_ack => true, exclusive: false)

q.subscribe(@mkworker)
end
Expand All @@ -104,7 +116,7 @@
q = Sneakers::Queue.new("test_nondurable", queue_vars)

mock(@mkqueue_nondurable).bind(@mkex, :routing_key => "test_nondurable")
mock(@mkqueue_nondurable).subscribe(:block => false, :manual_ack => true)
mock(@mkqueue_nondurable).subscribe(:block => false, :manual_ack => true, exclusive: false)

q.subscribe(@mkworker)
myqueue = q.instance_variable_get(:@queue)
Expand Down Expand Up @@ -153,7 +165,7 @@
queue_name = 'foo'
mock(@mkchan).queue(queue_name, :durable => true) { @mkqueue }
mock(@mkqueue).bind(@mkex, :routing_key => queue_name)
mock(@mkqueue).subscribe(:block => false, :manual_ack => true)
mock(@mkqueue).subscribe(:block => false, :manual_ack => true, exclusive: false)

my_vars = queue_vars.merge(:connection => @external_connection)
@q = Sneakers::Queue.new(queue_name, my_vars)
Expand Down

0 comments on commit a09520f

Please sign in to comment.