diff --git a/lib/Endpoint.js b/lib/Endpoint.js index 1f2bc57..958fa75 100644 --- a/lib/Endpoint.js +++ b/lib/Endpoint.js @@ -110,6 +110,28 @@ class Endpoint { let consumeResult try { + const worker = await this._remit._workers.acquire() + + try { + await worker.assertQueue(this._options.queue, { + exclusive: false, + durable: true, + autoDelete: true, + maxPriority: 10 + }) + + this._remit._workers.release(worker) + } catch (e) { + this._remit._workers.destroy(worker) + throw e + } + + await this._consumer.bindQueue( + this._options.queue, + this._remit._exchange, + this._options.event + ) + consumeResult = await this._consumer.consume( this._options.queue, this._remit._namespace.bind(this._incoming.bind(this)), @@ -225,22 +247,6 @@ class Endpoint { this._starting = true try { - const worker = await this._remit._workers.acquire() - - try { - await worker.assertQueue(queue, { - exclusive: false, - durable: true, - autoDelete: false, - maxPriority: 10 - }) - - this._remit._workers.release(worker) - } catch (e) { - delete this._starting - this._remit._workers.destroy(worker) - throw e - } const connection = await this._remit._connection this._consumer = await connection.createChannel() @@ -253,12 +259,6 @@ class Endpoint { this._consumer.prefetch(prefetch, true) } - await this._consumer.bindQueue( - queue, - this._remit._exchange, - event - ) - await this.resume() delete this._starting diff --git a/lib/Remit.js b/lib/Remit.js index 6848436..0d2d102 100644 --- a/lib/Remit.js +++ b/lib/Remit.js @@ -13,6 +13,7 @@ const Listener = require('./Listener') const Request = require('./Request') const Emitter = require('./Emitter') const { createNamespace } = require('cls-hooked') +const parseEvent = require('../utils/parseEvent') class Remit { constructor (options = {}) { @@ -79,9 +80,17 @@ class Remit { return connection } - async _incoming (message) { + async _incoming (isReturn, message) { if (!message) { await throwAsException(new Error('Request reply consumer cancelled unexpectedly; this was most probably done via RabbitMQ\'s management panel')) + } + + if (isReturn) { + return this._emitter.emit(`return-${message.properties.correlationId}`, message, { + event: parseEvent(message.properties, message.fields), + code: 'no_route', + message: `Request could not be routed to any endpoints. This signifies no matching endpoints are currently running.` + }) } try { diff --git a/lib/Request.js b/lib/Request.js index 404c8d3..bedea33 100644 --- a/lib/Request.js +++ b/lib/Request.js @@ -195,11 +195,12 @@ class Request extends CallableInstance { publishChannel.on('error', console.error) publishChannel.on('close', () => { throwAsException(new Error('Reply consumer died - this is most likely due to the RabbitMQ connection dying')) - }) + }) + publishChannel.on('return', this._remit._namespace.bind(this._remit._incoming.bind(this._remit, true))) await publishChannel.consume( 'amq.rabbitmq.reply-to', - this._remit._namespace.bind(this._remit._incoming.bind(this._remit)), + this._remit._namespace.bind(this._remit._incoming.bind(this._remit, false)), { noAck: true, exclusive: true @@ -216,7 +217,7 @@ class Request extends CallableInstance { } _waitForResult (messageId, span) { - const types = ['data', 'timeout'] + const types = ['data', 'timeout', 'return'] return new Promise((resolve, reject) => { const cleanUp = (message, err, result) => { diff --git a/test/endpoint.test.js b/test/endpoint.test.js index c4a4146..46106b9 100644 --- a/test/endpoint.test.js +++ b/test/endpoint.test.js @@ -518,8 +518,8 @@ describe('Endpoint', function () { } catch (e) { errorCaught = true expect(e).to.be.an('object') - expect(e).to.have.property('code', 'request_timedout') - expect(e).to.have.property('message', 'Request timed out after no response for 2000ms') + expect(e).to.have.property('code', 'no_route') + expect(e).to.have.property('message', 'Request could not be routed to any endpoints. This signifies no matching endpoints are currently running.') } expect(errorCaught).to.equal(true) @@ -555,8 +555,8 @@ describe('Endpoint', function () { } catch (e) { errorCaught = true expect(e).to.be.an('object') - expect(e).to.have.property('code', 'request_timedout') - expect(e).to.have.property('message', 'Request timed out after no response for 2000ms') + expect(e).to.have.property('code', 'no_route') + expect(e).to.have.property('message', 'Request could not be routed to any endpoints. This signifies no matching endpoints are currently running.') } expect(errorCaught).to.equal(true) diff --git a/test/request.test.js b/test/request.test.js index fb4f243..42b37e0 100644 --- a/test/request.test.js +++ b/test/request.test.js @@ -139,7 +139,11 @@ describe('Request', function () { this.slow(2000) const request = remit.request('timeout-test') - request.options({timeout: 1000}) + request.options({timeout: 1000}) + + await remit.endpoint('timeout-test') + .handler((e, cb) => setTimeout(cb, 3000)) + .start() try { await request()