From be126d6f544b0bad4e7b9f8ea31d6bf0b5108e35 Mon Sep 17 00:00:00 2001 From: Zach Lintz Date: Tue, 4 Dec 2018 23:41:12 -0700 Subject: [PATCH] feat: add passive option to queueFsm allows for checkQueue instead of assertQueue when clients have restricted permissions fixes issue #183 --- spec/behavior/queue.spec.js | 56 ++++++++++++++++++++++++ spec/integration/addPassiveQueue.spec.js | 49 +++++++++++++++++++++ src/amqp/queue.js | 24 ++++++---- src/queueFsm.js | 1 + 4 files changed, 122 insertions(+), 8 deletions(-) create mode 100644 spec/behavior/queue.spec.js create mode 100644 spec/integration/addPassiveQueue.spec.js diff --git a/spec/behavior/queue.spec.js b/spec/behavior/queue.spec.js new file mode 100644 index 0000000..cfdc902 --- /dev/null +++ b/spec/behavior/queue.spec.js @@ -0,0 +1,56 @@ +require('../setup.js'); +var ampqQueue = require('../../src/amqp/queue'); + +describe('AMQP Queue', function () { + let amqpChannelMock, options, topology, serializers; + + beforeEach(() => { + amqpChannelMock = { + ack: sinon.stub().callsFake(() => Promise.resolve()), + nack: sinon.stub().callsFake(() => Promise.resolve()), + checkQueue: sinon.stub().callsFake(() => Promise.resolve()), + assertQueue: sinon.stub().callsFake(() => Promise.resolve()) + }; + + options = { + uniqueName: 'one-unique-name-coming-up' + }; + + topology = { + connection: { + getChannel: sinon.stub().callsFake(() => Promise.resolve(amqpChannelMock)) + } + }; + + serializers = sinon.stub(); + }); + + describe('when executing "define"', () => { + describe('when options.passive is not set', () => { + it('calls assertQueue', function () { + return ampqQueue(options, topology, serializers) + .then((instance) => { + return instance.define(); + }) + .then(() => { + amqpChannelMock.checkQueue.calledOnce.should.equal(false); + amqpChannelMock.assertQueue.calledOnce.should.equal(true); + }); + }); + }); + + describe('when options.passive is true', function () { + it('calls checkQueue instead of assertQueue', async () => { + options.passive = true; + return ampqQueue(options, topology, serializers) + .then((instance) => { + return instance.define(); + }) + .then(() => { + amqpChannelMock.checkQueue.calledOnce.should.equal(true); + amqpChannelMock.assertQueue.calledOnce.should.equal(false); + }); + }); + }); + }); +}); diff --git a/spec/integration/addPassiveQueue.spec.js b/spec/integration/addPassiveQueue.spec.js new file mode 100644 index 0000000..a47418d --- /dev/null +++ b/spec/integration/addPassiveQueue.spec.js @@ -0,0 +1,49 @@ +require('../setup'); +const rabbit = require('../../src/index.js'); + +describe('Adding Queues', function () { + describe('when the queue does not already exist', function () { + it('should error on addQueue in passive mode', function () { + return rabbit.configure({ + connection: { + name: 'passiveErrorWithNoQueue' + } + }).then(() => { + return rabbit.addQueue('no-queue-here', { passive: true }, 'passiveErrorWithNoQueue') + .then( + () => { throw new Error('Should not have succeeded in the checkQueue call'); }, + (err) => { + err.toString().should.contain("Failed to create queue 'no-queue-here' on connection 'passiveErrorWithNoQueue' with 'Error: Operation failed: QueueDeclare; 404 (NOT-FOUND)"); + }); + }); + }); + after(function () { + rabbit.reset(); + return rabbit.shutdown('passiveErrorWithNoQueue'); + }); + }); + + describe('when the queue does exist', function () { + const existingQueueName = 'totes-exists-already'; + it('should NOT error on addQueue when in passive mode', function () { + return rabbit.configure({ + connection: { + name: 'passiveEnabledWithExistingQueue' + }, + queues: [ + { name: existingQueueName, connection: 'passiveEnabledWithExistingQueue' } + ] + }).then(() => { + return rabbit.addQueue(existingQueueName, { passive: true }, 'passiveEnabledWithExistingQueue'); + }); + }); + + after(function () { + return rabbit.deleteQueue(existingQueueName, 'passiveEnabledWithExistingQueue') + .then(() => { + rabbit.reset(); + return rabbit.shutdown(); + }); + }); + }); +}); diff --git a/src/amqp/queue.js b/src/amqp/queue.js index 60b38a8..d36191f 100644 --- a/src/amqp/queue.js +++ b/src/amqp/queue.js @@ -45,16 +45,24 @@ function define (channel, options, subscriber, connectionName) { deadletter: 'deadLetterExchange', deadLetter: 'deadLetterExchange', deadLetterRoutingKey: 'deadLetterRoutingKey' - }, 'subscribe', 'limit', 'noBatch', 'unique'); + }, 'subscribe', 'limit', 'noBatch', 'unique', 'passive'); topLog.info("Declaring queue '%s' on connection '%s' with the options: %s", options.uniqueName, connectionName, JSON.stringify(options)); - return channel.assertQueue(options.uniqueName, valid) - .then(function (q) { - if (options.limit) { - channel.prefetch(options.limit); - } - return q; - }); + + let queuePromise; + + if (options.passive) { + queuePromise = channel.checkQueue(options.uniqueName); + } else { + queuePromise = channel.assertQueue(options.uniqueName, valid); + } + + return queuePromise.then(function (q) { + if (options.limit) { + channel.prefetch(options.limit); + } + return q; + }); } function finalize (channel, messages) { diff --git a/src/queueFsm.js b/src/queueFsm.js index 134b82c..f041556 100644 --- a/src/queueFsm.js +++ b/src/queueFsm.js @@ -29,6 +29,7 @@ var Factory = function (options, connection, topology, serializers, queueFn) { var Fsm = machina.Fsm.extend({ name: options.name, uniqueName: options.uniqueName, + passive: options.passive, responseSubscriptions: {}, signalSubscription: undefined, subscribed: false,