From 5d94a1f4cf397a7caea8cb4d03049ca80b8a4599 Mon Sep 17 00:00:00 2001 From: Jack Williams Date: Sat, 30 Nov 2019 23:16:29 +0000 Subject: [PATCH 1/4] requests are now immediately returned if they can't be routed --- lib/Remit.js | 11 ++++++++++- lib/Request.js | 7 ++++--- test/endpoint.test.js | 8 ++++---- test/request.test.js | 6 +++++- 4 files changed, 23 insertions(+), 9 deletions(-) diff --git a/lib/Remit.js b/lib/Remit.js index 6848436..0d2d102 100644 --- a/lib/Remit.js +++ b/lib/Remit.js @@ -13,6 +13,7 @@ const Listener = require('./Listener') const Request = require('./Request') const Emitter = require('./Emitter') const { createNamespace } = require('cls-hooked') +const parseEvent = require('../utils/parseEvent') class Remit { constructor (options = {}) { @@ -79,9 +80,17 @@ class Remit { return connection } - async _incoming (message) { + async _incoming (isReturn, message) { if (!message) { await throwAsException(new Error('Request reply consumer cancelled unexpectedly; this was most probably done via RabbitMQ\'s management panel')) + } + + if (isReturn) { + return this._emitter.emit(`return-${message.properties.correlationId}`, message, { + event: parseEvent(message.properties, message.fields), + code: 'no_route', + message: `Request could not be routed to any endpoints. This signifies no matching endpoints are currently running.` + }) } try { diff --git a/lib/Request.js b/lib/Request.js index 404c8d3..bedea33 100644 --- a/lib/Request.js +++ b/lib/Request.js @@ -195,11 +195,12 @@ class Request extends CallableInstance { publishChannel.on('error', console.error) publishChannel.on('close', () => { throwAsException(new Error('Reply consumer died - this is most likely due to the RabbitMQ connection dying')) - }) + }) + publishChannel.on('return', this._remit._namespace.bind(this._remit._incoming.bind(this._remit, true))) await publishChannel.consume( 'amq.rabbitmq.reply-to', - this._remit._namespace.bind(this._remit._incoming.bind(this._remit)), + this._remit._namespace.bind(this._remit._incoming.bind(this._remit, false)), { noAck: true, exclusive: true @@ -216,7 +217,7 @@ class Request extends CallableInstance { } _waitForResult (messageId, span) { - const types = ['data', 'timeout'] + const types = ['data', 'timeout', 'return'] return new Promise((resolve, reject) => { const cleanUp = (message, err, result) => { diff --git a/test/endpoint.test.js b/test/endpoint.test.js index c4a4146..46106b9 100644 --- a/test/endpoint.test.js +++ b/test/endpoint.test.js @@ -518,8 +518,8 @@ describe('Endpoint', function () { } catch (e) { errorCaught = true expect(e).to.be.an('object') - expect(e).to.have.property('code', 'request_timedout') - expect(e).to.have.property('message', 'Request timed out after no response for 2000ms') + expect(e).to.have.property('code', 'no_route') + expect(e).to.have.property('message', 'Request could not be routed to any endpoints. This signifies no matching endpoints are currently running.') } expect(errorCaught).to.equal(true) @@ -555,8 +555,8 @@ describe('Endpoint', function () { } catch (e) { errorCaught = true expect(e).to.be.an('object') - expect(e).to.have.property('code', 'request_timedout') - expect(e).to.have.property('message', 'Request timed out after no response for 2000ms') + expect(e).to.have.property('code', 'no_route') + expect(e).to.have.property('message', 'Request could not be routed to any endpoints. This signifies no matching endpoints are currently running.') } expect(errorCaught).to.equal(true) diff --git a/test/request.test.js b/test/request.test.js index fb4f243..42b37e0 100644 --- a/test/request.test.js +++ b/test/request.test.js @@ -139,7 +139,11 @@ describe('Request', function () { this.slow(2000) const request = remit.request('timeout-test') - request.options({timeout: 1000}) + request.options({timeout: 1000}) + + await remit.endpoint('timeout-test') + .handler((e, cb) => setTimeout(cb, 3000)) + .start() try { await request() From 2012f5b371e2cd00cf3a1946e1de70d5eab38a71 Mon Sep 17 00:00:00 2001 From: Jack Williams Date: Sat, 30 Nov 2019 23:18:31 +0000 Subject: [PATCH 2/4] =?UTF-8?q?=F0=9F=92=A5=20close=20endpoint=20queues=20?= =?UTF-8?q?when=20all=20consumers=20leave?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This has been long-awaited. With this, requests can "fail fast" and immediately return if no consumers are available instead of hanging around for 30 seconds waiting for another instance to pop up. This is, however, a breaking change. --- lib/Endpoint.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/Endpoint.js b/lib/Endpoint.js index 1f2bc57..20578fe 100644 --- a/lib/Endpoint.js +++ b/lib/Endpoint.js @@ -231,7 +231,7 @@ class Endpoint { await worker.assertQueue(queue, { exclusive: false, durable: true, - autoDelete: false, + autoDelete: true, maxPriority: 10 }) From 9ac1e02a2541aa4b89297b1f2ede5a2a7014f6d8 Mon Sep 17 00:00:00 2001 From: Jack Williams Date: Sun, 1 Dec 2019 18:10:03 +0000 Subject: [PATCH 3/4] endpoints now assert and bind queue on resume This solves an issue where an endpoint loses all consumers, closes the queue and then a consumer attempts to "resume" and can't find the queue any more. Technically this could've happened previous to this `mandatory` change in a particular set of circumstances, so this is a good fix. --- lib/Endpoint.js | 44 ++++++++++++++++++++++---------------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/lib/Endpoint.js b/lib/Endpoint.js index 20578fe..6b5a10d 100644 --- a/lib/Endpoint.js +++ b/lib/Endpoint.js @@ -110,6 +110,28 @@ class Endpoint { let consumeResult try { + const worker = await this._remit._workers.acquire() + + try { + await worker.assertQueue(this._options.queue, { + exclusive: false, + durable: true, + autoDelete: false, + maxPriority: 10 + }) + + this._remit._workers.release(worker) + } catch (e) { + this._remit._workers.destroy(worker) + throw e + } + + await this._consumer.bindQueue( + this._options.queue, + this._remit._exchange, + this._options.event + ) + consumeResult = await this._consumer.consume( this._options.queue, this._remit._namespace.bind(this._incoming.bind(this)), @@ -225,22 +247,6 @@ class Endpoint { this._starting = true try { - const worker = await this._remit._workers.acquire() - - try { - await worker.assertQueue(queue, { - exclusive: false, - durable: true, - autoDelete: true, - maxPriority: 10 - }) - - this._remit._workers.release(worker) - } catch (e) { - delete this._starting - this._remit._workers.destroy(worker) - throw e - } const connection = await this._remit._connection this._consumer = await connection.createChannel() @@ -253,12 +259,6 @@ class Endpoint { this._consumer.prefetch(prefetch, true) } - await this._consumer.bindQueue( - queue, - this._remit._exchange, - event - ) - await this.resume() delete this._starting From 3b2f02d28eff5d71c26a5e2cdbbeeb564208d616 Mon Sep 17 00:00:00 2001 From: Jack Williams Date: Sun, 1 Dec 2019 18:16:21 +0000 Subject: [PATCH 4/4] reset autoDelete to true after bad merge --- lib/Endpoint.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/Endpoint.js b/lib/Endpoint.js index 6b5a10d..958fa75 100644 --- a/lib/Endpoint.js +++ b/lib/Endpoint.js @@ -116,7 +116,7 @@ class Endpoint { await worker.assertQueue(this._options.queue, { exclusive: false, durable: true, - autoDelete: false, + autoDelete: true, maxPriority: 10 })