diff --git a/lib/sneakers/queue.rb b/lib/sneakers/queue.rb index 0bd3a5b7..8e1213b4 100644 --- a/lib/sneakers/queue.rb +++ b/lib/sneakers/queue.rb @@ -55,7 +55,7 @@ def subscribe(worker) # retry queues, etc). handler = handler_klass.new(@channel, queue, worker.opts) - @consumer = queue.subscribe(block: false, manual_ack: @opts[:ack]) do | delivery_info, metadata, msg | + @consumer = queue.subscribe(block: false, manual_ack: @opts[:ack], exclusive: @opts[:exclusive]) do | delivery_info, metadata, msg | worker.do_work(delivery_info, metadata, msg, handler) end nil diff --git a/spec/sneakers/queue_spec.rb b/spec/sneakers/queue_spec.rb index fe203f4f..3132ddd2 100644 --- a/spec/sneakers/queue_spec.rb +++ b/spec/sneakers/queue_spec.rb @@ -6,6 +6,7 @@ { :prefetch => 25, :ack => true, + :exclusive => false, :heartbeat => 2, :vhost => '/', :exchange => "sneakers", @@ -57,7 +58,18 @@ q = Sneakers::Queue.new("downloads", queue_vars) mock(@mkqueue).bind(@mkex, :routing_key => "downloads") - mock(@mkqueue).subscribe(:block => false, :manual_ack => true) + mock(@mkqueue).subscribe(:block => false, :manual_ack => true, exclusive: false) + + q.subscribe(@mkworker) + end + + it "supports exclusive consumers" do + mock(@mkchan).queue("downloads", :durable => true) { @mkqueue } + q = Sneakers::Queue.new("downloads", + queue_vars.merge(:exclusive => true)) + + mock(@mkqueue).bind(@mkex, :routing_key => "downloads") + mock(@mkqueue).subscribe(:block => false, :manual_ack => true, exclusive: true) q.subscribe(@mkworker) end @@ -69,7 +81,7 @@ mock(@mkqueue).bind(@mkex, :routing_key => "alpha") mock(@mkqueue).bind(@mkex, :routing_key => "beta") - mock(@mkqueue).subscribe(:block => false, :manual_ack => true) + mock(@mkqueue).subscribe(:block => false, :manual_ack => true, exclusive: false) q.subscribe(@mkworker) end @@ -80,7 +92,7 @@ queue_vars.merge(:bind_arguments => { "os" => "linux", "cores" => 8 })) mock(@mkqueue).bind(@mkex, :routing_key => "downloads", :arguments => { "os" => "linux", "cores" => 8 }) - mock(@mkqueue).subscribe(:block => false, :manual_ack => true) + mock(@mkqueue).subscribe(:block => false, :manual_ack => true, exclusive: false) q.subscribe(@mkworker) end @@ -104,7 +116,7 @@ q = Sneakers::Queue.new("test_nondurable", queue_vars) mock(@mkqueue_nondurable).bind(@mkex, :routing_key => "test_nondurable") - mock(@mkqueue_nondurable).subscribe(:block => false, :manual_ack => true) + mock(@mkqueue_nondurable).subscribe(:block => false, :manual_ack => true, exclusive: false) q.subscribe(@mkworker) myqueue = q.instance_variable_get(:@queue) @@ -153,7 +165,7 @@ queue_name = 'foo' mock(@mkchan).queue(queue_name, :durable => true) { @mkqueue } mock(@mkqueue).bind(@mkex, :routing_key => queue_name) - mock(@mkqueue).subscribe(:block => false, :manual_ack => true) + mock(@mkqueue).subscribe(:block => false, :manual_ack => true, exclusive: false) my_vars = queue_vars.merge(:connection => @external_connection) @q = Sneakers::Queue.new(queue_name, my_vars)